diff --git a/example/cpp20_chat_room.cpp b/example/cpp20_chat_room.cpp index 17dfd0f5..a3a960df 100644 --- a/example/cpp20_chat_room.cpp +++ b/example/cpp20_chat_room.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include diff --git a/example/cpp20_containers.cpp b/example/cpp20_containers.cpp index c21675d8..7c648e38 100644 --- a/example/cpp20_containers.cpp +++ b/example/cpp20_containers.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include diff --git a/example/cpp20_echo_server.cpp b/example/cpp20_echo_server.cpp index dca86bf3..217c0a9f 100644 --- a/example/cpp20_echo_server.cpp +++ b/example/cpp20_echo_server.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include diff --git a/example/cpp20_resolve_with_sentinel.cpp b/example/cpp20_resolve_with_sentinel.cpp index e1bae87c..3d443402 100644 --- a/example/cpp20_resolve_with_sentinel.cpp +++ b/example/cpp20_resolve_with_sentinel.cpp @@ -6,6 +6,7 @@ #include +#include #include #include #include diff --git a/include/boost/redis/adapter/result.hpp b/include/boost/redis/adapter/result.hpp index 28290500..66069475 100644 --- a/include/boost/redis/adapter/result.hpp +++ b/include/boost/redis/adapter/result.hpp @@ -8,6 +8,7 @@ #ifndef BOOST_REDIS_ADAPTER_RESULT_HPP #define BOOST_REDIS_ADAPTER_RESULT_HPP +#include #include #include @@ -56,15 +57,9 @@ using result = system::result; */ BOOST_NORETURN inline void throw_exception_from_error(error const& e, boost::source_location const&) { - system::error_code ec; - switch (e.data_type) { - case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break; - case resp3::type::blob_error: ec = redis::error::resp3_blob_error; break; - case resp3::type::null: ec = redis::error::resp3_null; break; - default: BOOST_ASSERT_MSG(false, "Unexpected data type."); - } - - throw system::system_error(ec, e.diagnostic); + throw system::system_error( + system::error_code(detail::resp3_type_to_error(e.data_type)), + e.diagnostic); } } // namespace boost::redis::adapter diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index b283af27..ae2a35cf 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -12,11 +12,11 @@ #include #include #include -#include #include #include #include #include +#include #include #include #include @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -67,7 +68,6 @@ struct connection_impl { using receive_channel_type = asio::experimental::channel< Executor, void(system::error_code, std::size_t)>; - using health_checker_type = detail::health_checker; using exec_notifier_type = asio::experimental::channel< Executor, void(system::error_code, std::size_t)>; @@ -79,14 +79,16 @@ struct connection_impl { // not suspend. timer_type writer_timer_; timer_type reconnect_timer_; // to wait the reconnection period + timer_type ping_timer_; // to wait between pings receive_channel_type receive_channel_; - health_checker_type health_checker_; config cfg_; multiplexer mpx_; connection_logger logger_; read_buffer read_buffer_; generic_response setup_resp_; + request ping_req_; + generic_response ping_resp_; using executor_type = Executor; @@ -131,8 +133,8 @@ struct connection_impl { : stream_{ex, std::move(ctx)} , writer_timer_{ex} , reconnect_timer_{ex} + , ping_timer_{ex} , receive_channel_{ex, 256} - , health_checker_{ex} , logger_{std::move(lgr)} { set_receive_adapter(any_adapter{ignore}); @@ -153,11 +155,11 @@ struct connection_impl { break; case operation::run: cancel_run(); break; case operation::receive: receive_channel_.cancel(); break; - case operation::health_check: health_checker_.cancel(); break; + case operation::health_check: ping_timer_.cancel(); break; case operation::all: stream_.cancel_resolve(); cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); - health_checker_.cancel(); + ping_timer_.cancel(); cancel_run(); // run receive_channel_.cancel(); // receive mpx_.cancel_waiting(); // exec @@ -298,6 +300,112 @@ public: } }; +template +struct health_checker_op { + connection_impl* conn_; + asio::coroutine coro_{}; + + system::error_code check_errors(system::error_code io_ec) + { + // Did we have a timeout? + if (io_ec == asio::error::operation_aborted) { + conn_->logger_.log(logger::level::info, "Health checker: ping timed out"); + return asio::error::operation_aborted; + } + + // Did we have other unknown error? + if (io_ec) { + conn_->logger_.log(logger::level::info, "Health checker: ping error", io_ec); + return io_ec; + } + + // Did the server answer with an error? + if (conn_->ping_resp_.has_error()) { + auto error = conn_->ping_resp_.error(); + conn_->logger_.log( + logger::level::info, + "Health checker: server answered ping with an error", + error.diagnostic); + return resp3_type_to_error(error.data_type); + } + + // No error + return system::error_code(); + } + +public: + health_checker_op(connection_impl& conn) noexcept + : conn_{&conn} + { } + + template + void operator()(Self& self, system::error_code ec = {}, std::size_t = {}) + { + BOOST_ASIO_CORO_REENTER(coro_) + { + if (conn_->cfg_.health_check_interval == std::chrono::seconds::zero()) { + conn_->logger_.trace("ping_op (1): timeout disabled."); + BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self)); + self.complete(system::error_code{}); + return; + } + + for (;;) { + // Clean up any previous leftover + clear_response(conn_->ping_resp_); + + // Execute the request + BOOST_ASIO_CORO_YIELD + { + auto* conn = conn_; // avoid use-after-move problems + auto timeout = conn->cfg_.health_check_interval; + conn->async_exec( + conn->ping_req_, + any_adapter{conn->ping_resp_}, + asio::cancel_after(conn->ping_timer_, timeout, std::move(self))); + } + + // Check for cancellations + if (is_cancelled(self)) { + conn_->logger_.trace("ping_op (2): cancelled"); + self.complete(asio::error::operation_aborted); + return; + } + + // Check for errors in PING + ec = check_errors(ec); + if (ec) { + self.complete(ec); + return; + } + + // Wait before pinging again. + conn_->ping_timer_.expires_after(conn_->cfg_.health_check_interval); + + BOOST_ASIO_CORO_YIELD + conn_->ping_timer_.async_wait(std::move(self)); + if (ec) { + conn_->logger_.trace("ping_op (3)", ec); + self.complete(ec); + return; + } + + if (is_cancelled(self)) { + conn_->logger_.trace("ping_op (4): cancelled"); + self.complete(asio::error::operation_aborted); + return; + } + } + } + } +}; + +inline void compose_ping_request(const config& cfg, request& to) +{ + to.clear(); + to.push("PING", cfg.health_check_id); +} + inline system::error_code check_config(const config& cfg) { if (!cfg.unix_socket.empty()) { @@ -317,7 +425,7 @@ private: asio::coroutine coro_{}; system::error_code stored_ec_; - using order_t = std::array; + using order_t = std::array; static system::error_code on_setup_finished( connection_impl& conn, @@ -370,6 +478,15 @@ private: conn_->writer_timer_); } + template + auto health_checker(CompletionToken&& token) + { + return asio::async_compose( + health_checker_op{*conn_}, + std::forward(token), + conn_->writer_timer_); + } + public: run_op(connection_impl* conn) noexcept : conn_{conn} @@ -380,23 +497,22 @@ public: void operator()( Self& self, order_t order, - system::error_code ec0, - system::error_code ec1, - system::error_code ec2, - system::error_code ec3, - system::error_code) + system::error_code setup_ec, + system::error_code health_check_ec, + system::error_code reader_ec, + system::error_code /* writer_ec */) { system::error_code final_ec; - if (order[0] == 0 && !!ec0) { + if (order[0] == 0 && !!setup_ec) { // The setup op finished first and with an error - final_ec = ec0; - } else if (order[0] == 2 && ec2 == error::pong_timeout) { + final_ec = setup_ec; + } else if (order[0] == 1 && health_check_ec == error::pong_timeout) { // The check ping timeout finished first. Use the ping error code - final_ec = ec1; + final_ec = health_check_ec; } else { // Use the reader error code - final_ec = ec3; + final_ec = reader_ec; } (*this)(self, final_ec); @@ -421,6 +537,9 @@ public: // Compose the setup request. This only depends on the config, so it can be done just once compose_setup_request(conn_->cfg_); + // Compose the PING request. Same as above + compose_ping_request(conn_->cfg_, conn_->ping_req_); + for (;;) { // Try to connect BOOST_ASIO_CORO_YIELD @@ -442,10 +561,7 @@ public: return this->send_setup(token); }, [this](auto token) { - return conn_->health_checker_.async_ping(*conn_, token); - }, - [this](auto token) { - return conn_->health_checker_.async_check_timeout(*conn_, token); + return this->health_checker(token); }, [this](auto token) { return this->reader(token); @@ -641,7 +757,6 @@ public: auto async_run(config const& cfg, CompletionToken&& token = {}) { impl_->cfg_ = cfg; - impl_->health_checker_.set_config(cfg); impl_->read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size}); return asio::async_compose( @@ -956,7 +1071,6 @@ private: using receive_channel_type = asio::experimental::channel< executor_type, void(system::error_code, std::size_t)>; - using health_checker_type = detail::health_checker; auto use_ssl() const noexcept { return impl_->cfg_.use_ssl; } diff --git a/include/boost/redis/detail/connection_logger.hpp b/include/boost/redis/detail/connection_logger.hpp index 18637d73..4ebfd9d5 100644 --- a/include/boost/redis/detail/connection_logger.hpp +++ b/include/boost/redis/detail/connection_logger.hpp @@ -41,6 +41,7 @@ public: void on_fsm_resume(reader_fsm::action const& action); void on_setup(system::error_code const& ec, generic_response const& resp); void log(logger::level lvl, std::string_view msg); + void log(logger::level lvl, std::string_view msg1, std::string_view msg2); void log(logger::level lvl, std::string_view op, system::error_code const& ec); void trace(std::string_view message) { log(logger::level::debug, message); } void trace(std::string_view op, system::error_code const& ec) diff --git a/include/boost/redis/detail/health_checker.hpp b/include/boost/redis/detail/health_checker.hpp deleted file mode 100644 index 0351ad5a..00000000 --- a/include/boost/redis/detail/health_checker.hpp +++ /dev/null @@ -1,204 +0,0 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP -#define BOOST_REDIS_HEALTH_CHECKER_HPP - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -namespace boost::redis::detail { - -template -class ping_op { -public: - HealthChecker* checker_ = nullptr; - ConnectionImpl* conn_ = nullptr; - asio::coroutine coro_{}; - - template - void operator()(Self& self, system::error_code ec = {}, std::size_t = 0) - { - BOOST_ASIO_CORO_REENTER(coro_) for (;;) - { - if (checker_->ping_interval_ == std::chrono::seconds::zero()) { - conn_->logger_.trace("ping_op (1): timeout disabled."); - BOOST_ASIO_CORO_YIELD - asio::post(std::move(self)); - self.complete({}); - return; - } - - if (checker_->checker_has_exited_) { - conn_->logger_.trace("ping_op (2): checker has exited."); - self.complete({}); - return; - } - - BOOST_ASIO_CORO_YIELD - conn_->async_exec( - checker_->req_, - any_adapter{checker_->resp_}, - std::move(self)); - if (ec) { - conn_->logger_.trace("ping_op (3)", ec); - checker_->wait_timer_.cancel(); - self.complete(ec); - return; - } - - // Wait before pinging again. - checker_->ping_timer_.expires_after(checker_->ping_interval_); - - BOOST_ASIO_CORO_YIELD - checker_->ping_timer_.async_wait(std::move(self)); - if (ec) { - conn_->logger_.trace("ping_op (4)", ec); - self.complete(ec); - return; - } - - if (is_cancelled(self)) { - conn_->logger_.trace("ping_op (5): cancelled"); - self.complete(asio::error::operation_aborted); - return; - } - } - } -}; - -template -class check_timeout_op { -public: - HealthChecker* checker_ = nullptr; - Connection* conn_ = nullptr; - asio::coroutine coro_{}; - - template - void operator()(Self& self, system::error_code ec = {}) - { - BOOST_ASIO_CORO_REENTER(coro_) for (;;) - { - if (checker_->ping_interval_ == std::chrono::seconds::zero()) { - conn_->logger_.trace("check_timeout_op (1): timeout disabled."); - BOOST_ASIO_CORO_YIELD - asio::post(std::move(self)); - self.complete({}); - return; - } - - checker_->wait_timer_.expires_after(2 * checker_->ping_interval_); - - BOOST_ASIO_CORO_YIELD - checker_->wait_timer_.async_wait(std::move(self)); - if (ec) { - conn_->logger_.trace("check_timeout_op (2)", ec); - self.complete(ec); - return; - } - - if (checker_->resp_.has_error()) { - // TODO: Log the error. - conn_->logger_.trace("check_timeout_op (3): Response error."); - self.complete({}); - return; - } - - if (checker_->resp_.value().empty()) { - conn_->logger_.trace("check_timeout_op (4): pong timeout."); - checker_->ping_timer_.cancel(); - conn_->cancel(operation::run); - checker_->checker_has_exited_ = true; - self.complete(error::pong_timeout); - return; - } - - if (checker_->resp_.has_value()) { - checker_->resp_.value().clear(); - } - } - } -}; - -template -class health_checker { -private: - using timer_type = asio::basic_waitable_timer< - std::chrono::steady_clock, - asio::wait_traits, - Executor>; - -public: - health_checker(Executor ex) - : ping_timer_{ex} - , wait_timer_{ex} - { - req_.push("PING", "Boost.Redis"); - } - - void set_config(config const& cfg) - { - req_.clear(); - req_.push("PING", cfg.health_check_id); - ping_interval_ = cfg.health_check_interval; - } - - void cancel() - { - ping_timer_.cancel(); - wait_timer_.cancel(); - } - - template - auto async_ping(ConnectionImpl& conn, CompletionToken token) - { - return asio::async_compose( - ping_op{this, &conn}, - token, - conn, - ping_timer_); - } - - template - auto async_check_timeout(Connection& conn, CompletionToken token) - { - checker_has_exited_ = false; - return asio::async_compose( - check_timeout_op{this, &conn}, - token, - conn, - wait_timer_); - } - -private: - template friend class ping_op; - template friend class check_timeout_op; - - timer_type ping_timer_; - timer_type wait_timer_; - redis::request req_; - redis::generic_response resp_; - std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5}; - bool checker_has_exited_ = false; -}; - -} // namespace boost::redis::detail - -#endif // BOOST_REDIS_HEALTH_CHECKER_HPP diff --git a/include/boost/redis/detail/resp3_type_to_error.hpp b/include/boost/redis/detail/resp3_type_to_error.hpp new file mode 100644 index 00000000..36ce9283 --- /dev/null +++ b/include/boost/redis/detail/resp3_type_to_error.hpp @@ -0,0 +1,29 @@ +/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_RESP3_TYPE_TO_ERROR_HPP +#define BOOST_RESP3_TYPE_TO_ERROR_HPP + +#include +#include + +#include + +namespace boost::redis::detail { + +inline error resp3_type_to_error(resp3::type t) +{ + switch (t) { + case resp3::type::simple_error: return error::resp3_simple_error; + case resp3::type::blob_error: return error::resp3_blob_error; + case resp3::type::null: return error::resp3_null; + default: BOOST_ASSERT_MSG(false, "Unexpected data type."); return error::resp3_simple_error; + } +} + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_ADAPTER_RESULT_HPP diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 2ce42420..d0c6cf67 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -12,6 +12,7 @@ #include #include +#include namespace boost::redis::detail { @@ -201,6 +202,18 @@ void connection_logger::log(logger::level lvl, std::string_view message) logger_.fn(lvl, message); } +void connection_logger::log(logger::level lvl, std::string_view message1, std::string_view message2) +{ + if (logger_.lvl < lvl) + return; + + msg_ = message1; + msg_ += ": "; + msg_ += message2; + + logger_.fn(lvl, msg_); +} + void connection_logger::log(logger::level lvl, std::string_view op, system::error_code const& ec) { if (logger_.lvl < lvl)