2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Removes dependency on asio::promise as it does not compile on windows.

This commit is contained in:
Marcelo Zimbres
2023-03-23 16:49:37 +01:00
parent 7a08588808
commit 5ac4f7e8ad
9 changed files with 501 additions and 69 deletions

View File

@@ -7,9 +7,9 @@
#include <boost/redis/experimental/connector.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/consign.hpp>
#include <iostream>

View File

@@ -11,24 +11,23 @@
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/detail/read_ops.hpp>
#include <boost/asio/experimental/promise.hpp>
#include <boost/asio/experimental/use_promise.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <memory>
#include <chrono>
#include <optional>
namespace boost::redis {
namespace detail {
template <class HealthChecker, class Connection>
class check_health_op {
class ping_op {
public:
HealthChecker* checker = nullptr;
Connection* conn = nullptr;
HealthChecker* checker_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
@@ -36,20 +35,98 @@ public:
{
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
{
checker->prom_.emplace(conn->async_exec(checker->req_, checker->resp_, asio::experimental::use_promise));
checker->timer_.expires_after(checker->timeout_);
BOOST_ASIO_CORO_YIELD
checker->timer_.async_wait(std::move(self));
if (ec || is_cancelled(self) || checker->resp_.value().empty()) {
conn->cancel(operation::run);
BOOST_ASIO_CORO_YIELD
std::move(*checker->prom_)(std::move(self));
if (checker_->checker_has_exited_) {
self.complete({});
return;
}
checker->reset();
BOOST_ASIO_CORO_YIELD
conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
BOOST_REDIS_CHECK_OP0(checker_->wait_timer_.cancel();)
// Wait before pinging again.
checker_->ping_timer_.expires_after(checker_->ping_interval_);
BOOST_ASIO_CORO_YIELD
checker_->ping_timer_.async_wait(std::move(self));
BOOST_REDIS_CHECK_OP0(;)
}
}
};
template <class HealthChecker, class Connection>
class check_timeout_op {
public:
HealthChecker* checker_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void operator()(Self& self, system::error_code ec = {})
{
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
{
checker_->wait_timer_.expires_after(2 * checker_->ping_interval_);
BOOST_ASIO_CORO_YIELD
checker_->wait_timer_.async_wait(std::move(self));
BOOST_REDIS_CHECK_OP0(;)
if (!checker_->resp_.has_value()) {
self.complete({});
return;
}
if (checker_->resp_.value().empty()) {
checker_->ping_timer_.cancel();
conn_->cancel(operation::run);
checker_->checker_has_exited_ = true;
self.complete(error::pong_timeout);
return;
}
checker_->resp_.value().clear();
if (checker_->resp_.has_value()) {
checker_->resp_.value().clear();
}
}
}
};
template <class HealthChecker, class Connection>
class check_health_op {
public:
HealthChecker* checker_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void
operator()(
Self& self,
std::array<std::size_t, 2> order = {},
system::error_code ec1 = {},
system::error_code ec2 = {})
{
BOOST_ASIO_CORO_REENTER (coro_)
{
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
[this](auto token) { return checker_->async_ping(*conn_, token); },
[this](auto token) { return checker_->async_check_timeout(*conn_, token);}
).async_wait(
asio::experimental::wait_for_one(),
std::move(self));
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
switch (order[0]) {
case 0: self.complete(ec1); return;
case 1: self.complete(ec2); return;
default: BOOST_ASSERT(false);
}
}
}
};
@@ -57,7 +134,6 @@ public:
template <class Executor>
class health_checker {
private:
using promise_type = asio::experimental::promise<void(system::error_code, std::size_t), Executor>;
using timer_type =
asio::basic_waitable_timer<
std::chrono::steady_clock,
@@ -68,9 +144,10 @@ public:
health_checker(
Executor ex,
std::string const& msg,
std::chrono::steady_clock::duration interval)
: timer_{ex}
, timeout_{interval}
std::chrono::steady_clock::duration ping_interval)
: ping_timer_{ex}
, wait_timer_{ex}
, ping_interval_{ping_interval}
{
req_.push("PING", msg);
}
@@ -87,26 +164,41 @@ public:
>(check_health_op<health_checker, Connection>{this, &conn}, token, conn);
}
void reset()
{
resp_.value().clear();
prom_.reset();
}
void cancel()
{
timer_.cancel();
if (prom_)
prom_.cancel();
ping_timer_.cancel();
wait_timer_.cancel();
}
private:
template <class Connection, class CompletionToken>
auto async_ping(Connection& conn, CompletionToken token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(ping_op<health_checker, Connection>{this, &conn}, token, conn, ping_timer_);
}
template <class Connection, class CompletionToken>
auto async_check_timeout(Connection& conn, CompletionToken token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(check_timeout_op<health_checker, Connection>{this, &conn}, token, conn, wait_timer_);
}
template <class, class> friend class ping_op;
template <class, class> friend class check_timeout_op;
template <class, class> friend class check_health_op;
timer_type timer_;
std::optional<promise_type> prom_;
timer_type ping_timer_;
timer_type wait_timer_;
redis::request req_;
redis::generic_response resp_;
std::chrono::steady_clock::duration timeout_;
std::chrono::steady_clock::duration ping_interval_;
bool checker_has_exited_ = false;
};
} // detail
@@ -120,7 +212,7 @@ private:
*
* @param conn A connection to the Redis server.
* @param msg The message to be sent with the [PING](https://redis.io/commands/ping/) command. Seting a proper and unique id will help users identify which connections are active.
* @param interval Ping interval.
* @param ping_interval Ping ping_interval.
* @param token The completion token
*
* The completion token must have the following signature
@@ -128,6 +220,9 @@ private:
* @code
* void f(system::error_code);
* @endcode
*
* Completion occurs when a pong response is not receive within two
* times the ping interval.
*/
template <
class Connection,
@@ -137,12 +232,12 @@ auto
async_check_health(
Connection& conn,
std::string const& msg = "Boost.Redis",
std::chrono::steady_clock::duration interval = std::chrono::seconds{2},
std::chrono::steady_clock::duration ping_interval = std::chrono::seconds{2},
CompletionToken token = CompletionToken{})
{
using executor_type = typename Connection::executor_type;
using health_checker_type = detail::health_checker<executor_type>;
auto checker = std::make_shared<health_checker_type>(conn.get_executor(), msg, interval);
auto checker = std::make_shared<health_checker_type>(conn.get_executor(), msg, ping_interval);
return checker->async_check_health(conn, asio::consign(std::move(token), checker));
}

View File

@@ -51,6 +51,7 @@ public:
basic_connection(executor_type ex)
: base_type{ex}
, stream_{ex}
, reconnect_{true}
{}
/// Contructs from a context.
@@ -227,6 +228,22 @@ public:
void reserve(std::size_t read, std::size_t write)
{ base_type::reserve(read, write); }
/** @brief Enable reconnection
*
* This property plays any role only when used with
* `boost::redis::async_run`.
*/
void enable_reconnection() noexcept {reconnect_ = true;}
/** @brief Disable reconnection
*
* This property plays any role only when used with
* `boost::redis::async_run`.
*/
void disable_reconnection() noexcept {reconnect_ = false;}
bool reconnect() const noexcept {return reconnect_;}
private:
using this_type = basic_connection<next_layer_type>;
@@ -244,6 +261,7 @@ private:
auto lowest_layer() noexcept -> auto& { return stream_.lowest_layer(); }
Socket stream_;
bool reconnect_;
};
/** \brief A connection that uses a asio::ip::tcp::socket.

View File

@@ -69,6 +69,9 @@ enum class error
/// Connect timeout
connect_timeout,
/// Connect timeout
pong_timeout,
};
/** \internal

View File

@@ -0,0 +1,313 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_CONNECTOR_HPP
#define BOOST_REDIS_CONNECTOR_HPP
#include <boost/redis/detail/runner.hpp>
#include <boost/redis/check_health.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <string>
#include <chrono>
namespace boost::redis::experimental
{
struct connect_config {
address addr = address{"127.0.0.1", "6379"};
std::string username;
std::string password;
std::string clientname = "Boost.Redis";
std::string health_check_id = "Boost.Redis";
std::chrono::steady_clock::duration resolve_timeout = std::chrono::seconds{10};
std::chrono::steady_clock::duration connect_timeout = std::chrono::seconds{10};
std::chrono::steady_clock::duration health_check_timeout = std::chrono::seconds{2};
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
};
namespace detail
{
template <class Connector, class Connection>
struct hello_op {
Connector* ctor_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER (coro_)
{
ctor_->req_hello_.clear();
ctor_->resp_hello_.value().clear();
ctor_->add_hello();
BOOST_ASIO_CORO_YIELD
conn_->async_exec(ctor_->req_hello_, ctor_->resp_hello_, std::move(self));
ctor_->logger_.on_hello(ec);
BOOST_REDIS_CHECK_OP0(conn_->cancel(redis::operation::run);)
if (ctor_->resp_hello_.has_error()) {
conn_->cancel(redis::operation::run);
switch (ctor_->resp_hello_.error().data_type) {
case resp3::type::simple_error:
self.complete(error::resp3_simple_error);
break;
case resp3::type::blob_error:
self.complete(error::resp3_blob_error);
break;
default: BOOST_ASSERT_MSG(false, "Unexpects error data type.");
}
} else {
self.complete({});
}
}
}
};
template <class Connector, class Connection>
struct run_check_exec_op {
Connector* ctor_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, system::error_code ec1 = {}
, system::error_code ec2 = {}
, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER (coro_)
{
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
[this](auto token) { return ctor_->async_run_check(*conn_, token); },
[this](auto token) { return ctor_->async_hello(*conn_, token); }
).async_wait(
asio::experimental::wait_for_all(),
std::move(self)
);
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
// TODO: Which op should we use to complete?
switch (order[0]) {
case 0: self.complete(ec1); break;
case 1: self.complete(ec2); break;
default: BOOST_ASSERT(false);
}
}
}
};
template <class Connector, class Connection>
struct run_check_op {
Connector* ctor_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, system::error_code ec1 = {}
, system::error_code ec2 = {})
{
BOOST_ASIO_CORO_REENTER (coro_)
{
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
[this](auto token)
{
return ctor_->runner_.async_run(*conn_, ctor_->cfg_.resolve_timeout, ctor_->cfg_.connect_timeout, token);
},
[this](auto token)
{
return ctor_->health_checker_.async_check_health(*conn_, token);
}
).async_wait(
asio::experimental::wait_for_one(),
std::move(self));
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
switch (order[0]) {
case 0: self.complete(ec1); break;
case 1: self.complete(ec2); break;
default: BOOST_ASSERT(false);
}
}
}
};
template <class Connector, class Connection>
struct connect_op {
Connector* ctor_ = nullptr;
Connection* conn_ = nullptr;
asio::coroutine coro_{};
template <class Self>
void operator()(Self& self, system::error_code ec = {})
{
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
{
BOOST_ASIO_CORO_YIELD
ctor_->async_run_check_exec(*conn_, std::move(self));
ctor_->logger_.on_connection_lost();
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
conn_->reset_stream();
if (!conn_->reconnect()) {
self.complete({});
return;
}
// Wait some time before trying to reconnect.
ctor_->reconnect_wait_timer_.expires_after(ctor_->cfg_.reconnect_wait_interval);
BOOST_ASIO_CORO_YIELD
ctor_->reconnect_wait_timer_.async_wait(std::move(self));
BOOST_REDIS_CHECK_OP0(;)
}
}
};
template <class Executor, class Logger>
class connector {
public:
connector(Executor ex, connect_config cfg, Logger l)
: runner_{ex, cfg.addr, l}
, health_checker_{ex, cfg.health_check_id, cfg.health_check_timeout}
, reconnect_wait_timer_{ex}
, cfg_{cfg}
, logger_{l}
{ }
template <
class Connection,
class CompletionToken = asio::default_completion_token_t<Executor>
>
auto async_connect(Connection& conn, CompletionToken token = CompletionToken{})
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(connect_op<connector, Connection>{this, &conn}, token, conn);
}
void cancel()
{
runner_.cancel();
health_checker_.cancel();
reconnect_wait_timer_.cancel();
}
private:
using runner_type = redis::detail::runner<Executor, Logger>;
using health_checker_type = redis::detail::health_checker<Executor>;
using timer_type = typename runner_type::timer_type;
template <class, class> friend struct connect_op;
template <class, class> friend struct run_check_exec_op;
template <class, class> friend struct run_check_op;
template <class, class> friend struct hello_op;
template <class Connection, class CompletionToken>
auto async_run_check(Connection& conn, CompletionToken token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(run_check_op<connector, Connection>{this, &conn}, token, conn);
}
template <class Connection, class CompletionToken>
auto async_run_check_exec(Connection& conn, CompletionToken token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(run_check_exec_op<connector, Connection>{this, &conn}, token, conn);
}
template <class Connection, class CompletionToken>
auto async_hello(Connection& conn, CompletionToken token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(hello_op<connector, Connection>{this, &conn}, token, conn);
}
void add_hello()
{
if (!cfg_.username.empty() && !cfg_.password.empty() && !cfg_.clientname.empty())
req_hello_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password, "SETNAME", cfg_.clientname);
else if (cfg_.username.empty() && cfg_.password.empty() && cfg_.clientname.empty())
req_hello_.push("HELLO", "3");
else if (cfg_.clientname.empty())
req_hello_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password);
else
req_hello_.push("HELLO", "3", "SETNAME", cfg_.clientname);
// Subscribe to channels in the same request that sends HELLO
// because it has priority over all other requests.
// TODO: Subscribe to actual channels.
req_hello_.push("SUBSCRIBE", "channel");
}
runner_type runner_;
health_checker_type health_checker_;
timer_type reconnect_wait_timer_;
request req_hello_;
generic_response resp_hello_;
connect_config cfg_;
Logger logger_;
};
} // detail
template <
class Socket,
class Logger = logger,
class CompletionToken = asio::default_completion_token_t<typename Socket::executor_type>
>
auto
async_connect(
basic_connection<Socket>& conn,
connect_config cfg = connect_config{},
Logger l = logger{},
CompletionToken token = CompletionToken{})
{
using executor_type = typename Socket::executor_type;
using connector_type = detail::connector<executor_type, Logger>;
auto ctor = std::make_shared<connector_type>(conn.get_executor(), cfg, l);
return ctor->async_connect(conn, asio::consign(std::move(token), ctor));
}
} // boost::redis::experimental
#endif // BOOST_REDIS_CONNECTOR_HPP

View File

@@ -40,6 +40,7 @@ struct error_category_impl : system::error_category {
case error::not_connected: return "Not connected.";
case error::resolve_timeout: return "Resolve timeout.";
case error::connect_timeout: return "Connect timeout.";
case error::pong_timeout: return "Pong timeout.";
default: BOOST_ASSERT(false); return "Boost.Redis error.";
}
}

View File

@@ -17,13 +17,13 @@ namespace boost::redis {
// TODO: Implement filter.
class logger {
public:
void on_resolve(system::error_code const& ec, asio::ip::tcp::resolver::results_type const& res)
void on_resolve(system::error_code const& ec, asio::ip::tcp::resolver::results_type const&)
{
// TODO: Print the endpoints
std::clog << "on_resolve: " << ec.message() << std::endl;
}
void on_connect(system::error_code const& ec, asio::ip::tcp::endpoint const& ep)
void on_connect(system::error_code const& ec, asio::ip::tcp::endpoint const&)
{
// TODO: Print the endpoint
std::clog << "on_connect: " << ec.message() << std::endl;

View File

@@ -85,8 +85,8 @@ BOOST_AUTO_TEST_CASE(check_health)
bool seen = false;
async_check_health(conn, msg, interval, [&](auto ec) {
BOOST_TEST(!ec);
std::cout << "async_check_health: completed." << std::endl;
BOOST_CHECK_EQUAL(ec, boost::redis::error::pong_timeout);
std::cout << "async_check_health: completed: " << ec.message() << std::endl;
seen = true;
});
@@ -101,7 +101,7 @@ BOOST_AUTO_TEST_CASE(check_health)
request req2;
req2.push("HELLO", "3");
req2.push("CLIENT", "PAUSE", "3000", "ALL");
req2.push("CLIENT", "PAUSE", "5000", "ALL");
generic_response resp;
push_callback{&conn, &conn2, &resp, &req2}(); // Starts reading pushes.

View File

@@ -6,33 +6,40 @@
// Must come before any asio header, otherwise build fails on msvc.
#include <boost/redis/run.hpp>
#include <boost/redis/check_health.hpp>
#include <boost/redis/experimental/connector.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <tuple>
#include <iostream>
#include "../examples/start.hpp"
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
namespace redis = boost::redis;
using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::async_check_health;
using boost::redis::async_run;
using boost::redis::address;
using redis::request;
using redis::response;
using redis::ignore;
using redis::address;
using redis::logger;
using redis::experimental::async_connect;
using redis::experimental::connect_config;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using namespace std::chrono_literals;
// Push consumer
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
for (;;)
co_await conn->async_receive();
boost::system::error_code ec;
while (!ec)
co_await conn->async_receive(ignore, net::redirect_error(net::use_awaitable, ec));
}
auto periodic_task(std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -56,27 +63,22 @@ auto periodic_task(std::shared_ptr<connection> conn) -> net::awaitable<void>
}
std::cout << "Periodic task done!" << std::endl;
conn->disable_reconnection();
conn->cancel(redis::operation::run);
conn->cancel(redis::operation::receive);
}
auto co_main(address const& addr) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
steady_timer timer{ex};
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
connect_config cfg;
cfg.addr = addr;
// The loop will reconnect on connection lost. To exit type Ctrl-C twice.
for (int i = 0; i < 10; ++i) {
co_await ((async_run(*conn, addr) || receiver(conn) || async_check_health(*conn) || periodic_task(conn)) &&
conn->async_exec(req));
conn->reset_stream();
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();
}
net::co_spawn(ex, receiver(conn), net::detached);
net::co_spawn(ex, periodic_task(conn), net::detached);
redis::experimental::async_connect(*conn, cfg, logger{}, net::consign(net::detached, conn));
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)