2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Simplifies the health checker (#317)

Modifies the health checker to use asio::cancel_after rather than a separate parallel group task
This commit is contained in:
Anarthal (Rubén Pérez)
2025-09-27 19:08:19 +02:00
committed by GitHub
parent beab3f69ed
commit e414b3941a
10 changed files with 188 additions and 236 deletions

View File

@@ -7,6 +7,7 @@
#include <boost/redis/connection.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/redirect_error.hpp>

View File

@@ -7,6 +7,7 @@
#include <boost/redis/connection.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <iostream>

View File

@@ -7,6 +7,7 @@
#include <boost/redis/connection.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>

View File

@@ -6,6 +6,7 @@
#include <boost/redis/connection.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/use_awaitable.hpp>

View File

@@ -8,6 +8,7 @@
#ifndef BOOST_REDIS_ADAPTER_RESULT_HPP
#define BOOST_REDIS_ADAPTER_RESULT_HPP
#include <boost/redis/detail/resp3_type_to_error.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/resp3/type.hpp>
@@ -56,15 +57,9 @@ using result = system::result<Value, error>;
*/
BOOST_NORETURN inline void throw_exception_from_error(error const& e, boost::source_location const&)
{
system::error_code ec;
switch (e.data_type) {
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
case resp3::type::blob_error: ec = redis::error::resp3_blob_error; break;
case resp3::type::null: ec = redis::error::resp3_null; break;
default: BOOST_ASSERT_MSG(false, "Unexpected data type.");
}
throw system::system_error(ec, e.diagnostic);
throw system::system_error(
system::error_code(detail::resp3_type_to_error(e.data_type)),
e.diagnostic);
}
} // namespace boost::redis::adapter

View File

@@ -12,11 +12,11 @@
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/detail/redis_stream.hpp>
#include <boost/redis/detail/resp3_type_to_error.hpp>
#include <boost/redis/detail/setup_request_utils.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
@@ -34,6 +34,7 @@
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/immediate.hpp>
@@ -67,7 +68,6 @@ struct connection_impl {
using receive_channel_type = asio::experimental::channel<
Executor,
void(system::error_code, std::size_t)>;
using health_checker_type = detail::health_checker<Executor>;
using exec_notifier_type = asio::experimental::channel<
Executor,
void(system::error_code, std::size_t)>;
@@ -79,14 +79,16 @@ struct connection_impl {
// not suspend.
timer_type writer_timer_;
timer_type reconnect_timer_; // to wait the reconnection period
timer_type ping_timer_; // to wait between pings
receive_channel_type receive_channel_;
health_checker_type health_checker_;
config cfg_;
multiplexer mpx_;
connection_logger logger_;
read_buffer read_buffer_;
generic_response setup_resp_;
request ping_req_;
generic_response ping_resp_;
using executor_type = Executor;
@@ -131,8 +133,8 @@ struct connection_impl {
: stream_{ex, std::move(ctx)}
, writer_timer_{ex}
, reconnect_timer_{ex}
, ping_timer_{ex}
, receive_channel_{ex, 256}
, health_checker_{ex}
, logger_{std::move(lgr)}
{
set_receive_adapter(any_adapter{ignore});
@@ -153,11 +155,11 @@ struct connection_impl {
break;
case operation::run: cancel_run(); break;
case operation::receive: receive_channel_.cancel(); break;
case operation::health_check: health_checker_.cancel(); break;
case operation::health_check: ping_timer_.cancel(); break;
case operation::all:
stream_.cancel_resolve();
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
health_checker_.cancel();
ping_timer_.cancel();
cancel_run(); // run
receive_channel_.cancel(); // receive
mpx_.cancel_waiting(); // exec
@@ -298,6 +300,112 @@ public:
}
};
template <class Executor>
struct health_checker_op {
connection_impl<Executor>* conn_;
asio::coroutine coro_{};
system::error_code check_errors(system::error_code io_ec)
{
// Did we have a timeout?
if (io_ec == asio::error::operation_aborted) {
conn_->logger_.log(logger::level::info, "Health checker: ping timed out");
return asio::error::operation_aborted;
}
// Did we have other unknown error?
if (io_ec) {
conn_->logger_.log(logger::level::info, "Health checker: ping error", io_ec);
return io_ec;
}
// Did the server answer with an error?
if (conn_->ping_resp_.has_error()) {
auto error = conn_->ping_resp_.error();
conn_->logger_.log(
logger::level::info,
"Health checker: server answered ping with an error",
error.diagnostic);
return resp3_type_to_error(error.data_type);
}
// No error
return system::error_code();
}
public:
health_checker_op(connection_impl<Executor>& conn) noexcept
: conn_{&conn}
{ }
template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t = {})
{
BOOST_ASIO_CORO_REENTER(coro_)
{
if (conn_->cfg_.health_check_interval == std::chrono::seconds::zero()) {
conn_->logger_.trace("ping_op (1): timeout disabled.");
BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self));
self.complete(system::error_code{});
return;
}
for (;;) {
// Clean up any previous leftover
clear_response(conn_->ping_resp_);
// Execute the request
BOOST_ASIO_CORO_YIELD
{
auto* conn = conn_; // avoid use-after-move problems
auto timeout = conn->cfg_.health_check_interval;
conn->async_exec(
conn->ping_req_,
any_adapter{conn->ping_resp_},
asio::cancel_after(conn->ping_timer_, timeout, std::move(self)));
}
// Check for cancellations
if (is_cancelled(self)) {
conn_->logger_.trace("ping_op (2): cancelled");
self.complete(asio::error::operation_aborted);
return;
}
// Check for errors in PING
ec = check_errors(ec);
if (ec) {
self.complete(ec);
return;
}
// Wait before pinging again.
conn_->ping_timer_.expires_after(conn_->cfg_.health_check_interval);
BOOST_ASIO_CORO_YIELD
conn_->ping_timer_.async_wait(std::move(self));
if (ec) {
conn_->logger_.trace("ping_op (3)", ec);
self.complete(ec);
return;
}
if (is_cancelled(self)) {
conn_->logger_.trace("ping_op (4): cancelled");
self.complete(asio::error::operation_aborted);
return;
}
}
}
}
};
inline void compose_ping_request(const config& cfg, request& to)
{
to.clear();
to.push("PING", cfg.health_check_id);
}
inline system::error_code check_config(const config& cfg)
{
if (!cfg.unix_socket.empty()) {
@@ -317,7 +425,7 @@ private:
asio::coroutine coro_{};
system::error_code stored_ec_;
using order_t = std::array<std::size_t, 5>;
using order_t = std::array<std::size_t, 4>;
static system::error_code on_setup_finished(
connection_impl<Executor>& conn,
@@ -370,6 +478,15 @@ private:
conn_->writer_timer_);
}
template <class CompletionToken>
auto health_checker(CompletionToken&& token)
{
return asio::async_compose<CompletionToken, void(system::error_code)>(
health_checker_op<Executor>{*conn_},
std::forward<CompletionToken>(token),
conn_->writer_timer_);
}
public:
run_op(connection_impl<Executor>* conn) noexcept
: conn_{conn}
@@ -380,23 +497,22 @@ public:
void operator()(
Self& self,
order_t order,
system::error_code ec0,
system::error_code ec1,
system::error_code ec2,
system::error_code ec3,
system::error_code)
system::error_code setup_ec,
system::error_code health_check_ec,
system::error_code reader_ec,
system::error_code /* writer_ec */)
{
system::error_code final_ec;
if (order[0] == 0 && !!ec0) {
if (order[0] == 0 && !!setup_ec) {
// The setup op finished first and with an error
final_ec = ec0;
} else if (order[0] == 2 && ec2 == error::pong_timeout) {
final_ec = setup_ec;
} else if (order[0] == 1 && health_check_ec == error::pong_timeout) {
// The check ping timeout finished first. Use the ping error code
final_ec = ec1;
final_ec = health_check_ec;
} else {
// Use the reader error code
final_ec = ec3;
final_ec = reader_ec;
}
(*this)(self, final_ec);
@@ -421,6 +537,9 @@ public:
// Compose the setup request. This only depends on the config, so it can be done just once
compose_setup_request(conn_->cfg_);
// Compose the PING request. Same as above
compose_ping_request(conn_->cfg_, conn_->ping_req_);
for (;;) {
// Try to connect
BOOST_ASIO_CORO_YIELD
@@ -442,10 +561,7 @@ public:
return this->send_setup(token);
},
[this](auto token) {
return conn_->health_checker_.async_ping(*conn_, token);
},
[this](auto token) {
return conn_->health_checker_.async_check_timeout(*conn_, token);
return this->health_checker(token);
},
[this](auto token) {
return this->reader(token);
@@ -641,7 +757,6 @@ public:
auto async_run(config const& cfg, CompletionToken&& token = {})
{
impl_->cfg_ = cfg;
impl_->health_checker_.set_config(cfg);
impl_->read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size});
return asio::async_compose<CompletionToken, void(system::error_code)>(
@@ -956,7 +1071,6 @@ private:
using receive_channel_type = asio::experimental::channel<
executor_type,
void(system::error_code, std::size_t)>;
using health_checker_type = detail::health_checker<Executor>;
auto use_ssl() const noexcept { return impl_->cfg_.use_ssl; }

View File

@@ -41,6 +41,7 @@ public:
void on_fsm_resume(reader_fsm::action const& action);
void on_setup(system::error_code const& ec, generic_response const& resp);
void log(logger::level lvl, std::string_view msg);
void log(logger::level lvl, std::string_view msg1, std::string_view msg2);
void log(logger::level lvl, std::string_view op, system::error_code const& ec);
void trace(std::string_view message) { log(logger::level::debug, message); }
void trace(std::string_view op, system::error_code const& ec)

View File

@@ -1,204 +0,0 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_HEALTH_CHECKER_HPP
#define BOOST_REDIS_HEALTH_CHECKER_HPP
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
namespace boost::redis::detail {
template <class HealthChecker, class ConnectionImpl>
class ping_op {
public:
HealthChecker* checker_ = nullptr;
ConnectionImpl* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER(coro_) for (;;)
{
if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
conn_->logger_.trace("ping_op (1): timeout disabled.");
BOOST_ASIO_CORO_YIELD
asio::post(std::move(self));
self.complete({});
return;
}
if (checker_->checker_has_exited_) {
conn_->logger_.trace("ping_op (2): checker has exited.");
self.complete({});
return;
}
BOOST_ASIO_CORO_YIELD
conn_->async_exec(
checker_->req_,
any_adapter{checker_->resp_},
std::move(self));
if (ec) {
conn_->logger_.trace("ping_op (3)", ec);
checker_->wait_timer_.cancel();
self.complete(ec);
return;
}
// Wait before pinging again.
checker_->ping_timer_.expires_after(checker_->ping_interval_);
BOOST_ASIO_CORO_YIELD
checker_->ping_timer_.async_wait(std::move(self));
if (ec) {
conn_->logger_.trace("ping_op (4)", ec);
self.complete(ec);
return;
}
if (is_cancelled(self)) {
conn_->logger_.trace("ping_op (5): cancelled");
self.complete(asio::error::operation_aborted);
return;
}
}
}
};
template <class HealthChecker, class Connection>
class check_timeout_op {
public:
HealthChecker* checker_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void operator()(Self& self, system::error_code ec = {})
{
BOOST_ASIO_CORO_REENTER(coro_) for (;;)
{
if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
conn_->logger_.trace("check_timeout_op (1): timeout disabled.");
BOOST_ASIO_CORO_YIELD
asio::post(std::move(self));
self.complete({});
return;
}
checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
BOOST_ASIO_CORO_YIELD
checker_->wait_timer_.async_wait(std::move(self));
if (ec) {
conn_->logger_.trace("check_timeout_op (2)", ec);
self.complete(ec);
return;
}
if (checker_->resp_.has_error()) {
// TODO: Log the error.
conn_->logger_.trace("check_timeout_op (3): Response error.");
self.complete({});
return;
}
if (checker_->resp_.value().empty()) {
conn_->logger_.trace("check_timeout_op (4): pong timeout.");
checker_->ping_timer_.cancel();
conn_->cancel(operation::run);
checker_->checker_has_exited_ = true;
self.complete(error::pong_timeout);
return;
}
if (checker_->resp_.has_value()) {
checker_->resp_.value().clear();
}
}
}
};
template <class Executor>
class health_checker {
private:
using timer_type = asio::basic_waitable_timer<
std::chrono::steady_clock,
asio::wait_traits<std::chrono::steady_clock>,
Executor>;
public:
health_checker(Executor ex)
: ping_timer_{ex}
, wait_timer_{ex}
{
req_.push("PING", "Boost.Redis");
}
void set_config(config const& cfg)
{
req_.clear();
req_.push("PING", cfg.health_check_id);
ping_interval_ = cfg.health_check_interval;
}
void cancel()
{
ping_timer_.cancel();
wait_timer_.cancel();
}
template <class ConnectionImpl, class CompletionToken>
auto async_ping(ConnectionImpl& conn, CompletionToken token)
{
return asio::async_compose<CompletionToken, void(system::error_code)>(
ping_op<health_checker, ConnectionImpl>{this, &conn},
token,
conn,
ping_timer_);
}
template <class Connection, class CompletionToken>
auto async_check_timeout(Connection& conn, CompletionToken token)
{
checker_has_exited_ = false;
return asio::async_compose<CompletionToken, void(system::error_code)>(
check_timeout_op<health_checker, Connection>{this, &conn},
token,
conn,
wait_timer_);
}
private:
template <class, class> friend class ping_op;
template <class, class> friend class check_timeout_op;
timer_type ping_timer_;
timer_type wait_timer_;
redis::request req_;
redis::generic_response resp_;
std::chrono::steady_clock::duration ping_interval_ = std::chrono::seconds{5};
bool checker_has_exited_ = false;
};
} // namespace boost::redis::detail
#endif // BOOST_REDIS_HEALTH_CHECKER_HPP

View File

@@ -0,0 +1,29 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_RESP3_TYPE_TO_ERROR_HPP
#define BOOST_RESP3_TYPE_TO_ERROR_HPP
#include <boost/redis/error.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/assert.hpp>
namespace boost::redis::detail {
inline error resp3_type_to_error(resp3::type t)
{
switch (t) {
case resp3::type::simple_error: return error::resp3_simple_error;
case resp3::type::blob_error: return error::resp3_blob_error;
case resp3::type::null: return error::resp3_null;
default: BOOST_ASSERT_MSG(false, "Unexpected data type."); return error::resp3_simple_error;
}
}
} // namespace boost::redis::detail
#endif // BOOST_REDIS_ADAPTER_RESULT_HPP

View File

@@ -12,6 +12,7 @@
#include <boost/system/error_code.hpp>
#include <string>
#include <string_view>
namespace boost::redis::detail {
@@ -201,6 +202,18 @@ void connection_logger::log(logger::level lvl, std::string_view message)
logger_.fn(lvl, message);
}
void connection_logger::log(logger::level lvl, std::string_view message1, std::string_view message2)
{
if (logger_.lvl < lvl)
return;
msg_ = message1;
msg_ += ": ";
msg_ += message2;
logger_.fn(lvl, msg_);
}
void connection_logger::log(logger::level lvl, std::string_view op, system::error_code const& ec)
{
if (logger_.lvl < lvl)