mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Merge pull request #89 from boostorg/88-simplify-async_check_health-with-asioconsign
Uses consign to simplify the check-health operation.
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
#include <boost/asio/experimental/use_promise.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/compose.hpp>
|
||||
#include <boost/asio/consign.hpp>
|
||||
#include <memory>
|
||||
#include <chrono>
|
||||
#include <optional>
|
||||
@@ -23,79 +24,84 @@
|
||||
namespace boost::redis {
|
||||
namespace detail {
|
||||
|
||||
template <class Connection>
|
||||
template <class HealthChecker, class Connection>
|
||||
class check_health_op {
|
||||
private:
|
||||
using executor_type = typename Connection::executor_type;
|
||||
|
||||
struct state {
|
||||
using promise_type = asio::experimental::promise<void(system::error_code, std::size_t), executor_type>;
|
||||
using timer_type =
|
||||
asio::basic_waitable_timer<
|
||||
std::chrono::steady_clock,
|
||||
asio::wait_traits<std::chrono::steady_clock>,
|
||||
executor_type>;
|
||||
|
||||
|
||||
timer_type timer_;
|
||||
redis::request req_;
|
||||
redis::generic_response resp_;
|
||||
std::optional<promise_type> prom_;
|
||||
std::chrono::steady_clock::duration interval_;
|
||||
|
||||
state(
|
||||
executor_type ex,
|
||||
std::string const& msg,
|
||||
std::chrono::steady_clock::duration interval)
|
||||
: timer_{ex}
|
||||
, interval_{interval}
|
||||
{
|
||||
req_.push("PING", msg);
|
||||
}
|
||||
|
||||
void reset()
|
||||
{
|
||||
resp_.value().clear();
|
||||
prom_.reset();
|
||||
}
|
||||
};
|
||||
|
||||
Connection* conn_ = nullptr;
|
||||
std::shared_ptr<state> state_ = nullptr;
|
||||
asio::coroutine coro_{};
|
||||
|
||||
public:
|
||||
check_health_op(
|
||||
Connection& conn,
|
||||
std::string const& msg,
|
||||
std::chrono::steady_clock::duration interval)
|
||||
: conn_{&conn}
|
||||
, state_{std::make_shared<state>(conn.get_executor(), msg, interval)}
|
||||
{ }
|
||||
HealthChecker* checker = nullptr;
|
||||
Connection* conn = nullptr;
|
||||
asio::coroutine coro_{};
|
||||
|
||||
template <class Self>
|
||||
void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
|
||||
{
|
||||
state_->prom_.emplace(conn_->async_exec(state_->req_, state_->resp_, asio::experimental::use_promise));
|
||||
checker->prom_.emplace(conn->async_exec(checker->req_, checker->resp_, asio::experimental::use_promise));
|
||||
|
||||
state_->timer_.expires_after(state_->interval_);
|
||||
checker->timer_.expires_after(checker->interval_);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
state_->timer_.async_wait(std::move(self));
|
||||
if (ec || is_cancelled(self) || state_->resp_.value().empty()) {
|
||||
conn_->cancel(operation::run);
|
||||
checker->timer_.async_wait(std::move(self));
|
||||
if (ec || is_cancelled(self) || checker->resp_.value().empty()) {
|
||||
conn->cancel(operation::run);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
std::move(*state_->prom_)(std::move(self));
|
||||
std::move(*checker->prom_)(std::move(self));
|
||||
self.complete({});
|
||||
return;
|
||||
}
|
||||
|
||||
state_->reset();
|
||||
checker->reset();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class Executor>
|
||||
class health_checker {
|
||||
private:
|
||||
using promise_type = asio::experimental::promise<void(system::error_code, std::size_t), Executor>;
|
||||
using timer_type =
|
||||
asio::basic_waitable_timer<
|
||||
std::chrono::steady_clock,
|
||||
asio::wait_traits<std::chrono::steady_clock>,
|
||||
Executor>;
|
||||
|
||||
public:
|
||||
health_checker(
|
||||
Executor ex,
|
||||
std::string const& msg,
|
||||
std::chrono::steady_clock::duration interval)
|
||||
: timer_{ex}
|
||||
, interval_{interval}
|
||||
{
|
||||
req_.push("PING", msg);
|
||||
}
|
||||
|
||||
template <
|
||||
class Connection,
|
||||
class CompletionToken = asio::default_completion_token_t<Executor>
|
||||
>
|
||||
auto async_check_health(Connection& conn, CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return asio::async_compose
|
||||
< CompletionToken
|
||||
, void(system::error_code)
|
||||
>(check_health_op<health_checker, Connection>{this, &conn}, token, conn);
|
||||
}
|
||||
|
||||
void reset()
|
||||
{
|
||||
resp_.value().clear();
|
||||
prom_.reset();
|
||||
}
|
||||
|
||||
private:
|
||||
template <class, class> friend class check_health_op;
|
||||
timer_type timer_;
|
||||
std::optional<promise_type> prom_;
|
||||
redis::request req_;
|
||||
redis::generic_response resp_;
|
||||
std::chrono::steady_clock::duration interval_;
|
||||
};
|
||||
|
||||
} // detail
|
||||
|
||||
/** @brief Checks Redis health asynchronously
|
||||
@@ -127,10 +133,10 @@ async_check_health(
|
||||
std::chrono::steady_clock::duration interval = std::chrono::seconds{2},
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return asio::async_compose
|
||||
< CompletionToken
|
||||
, void(system::error_code)
|
||||
>(detail::check_health_op<Connection>{conn, msg, interval}, token, conn);
|
||||
using executor_type = typename Connection::executor_type;
|
||||
using health_checker_type = detail::health_checker<executor_type>;
|
||||
auto checker = std::make_shared<health_checker_type>(conn.get_executor(), msg, interval);
|
||||
return checker->async_check_health(conn, asio::consign(std::move(token), checker));
|
||||
}
|
||||
|
||||
} // boost::redis
|
||||
|
||||
Reference in New Issue
Block a user