2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Support for reconnection.

This commit is contained in:
Marcelo Zimbres
2022-08-06 17:46:53 +02:00
parent 54d448cad4
commit 37ab1e7387
9 changed files with 110 additions and 59 deletions

View File

@@ -1,14 +1,13 @@
# Changelog
## master
## v0.3.0
* Adds `experimental::exec` and `experimental::receive_event`
* Adds `experimental::exec` and `receive_event`
functions to offer a thread safe and synchronous way of executing
requests. See `intro_sync.cpp` and `subscriber_sync.cpp` for
examples.
* `connection::async_read_push` has been renamed to
`async_receive_event`.
* `connection::async_read_push` was renamed to `async_receive_event`.
* Uses `async_receive_event` to communicate internal events to the
user, see subscriber.cpp and `connection::event`.
@@ -17,6 +16,11 @@
similar to Boost libraries. Users should now replace `-I/aedis-path`
with `-I/aedis-path/include` in the compiler flags.
* AUTH and HELLO commands are sent automatically. This change was
necessary to implement reconnection.
* Adds support for reconnection. See connection::enable reconnect.
* Fixes a bug in the `connection::async_exec(host, port)` overload
that was causing crashes reconnection.

View File

@@ -20,7 +20,7 @@ check_PROGRAMS += intro_sync
check_PROGRAMS += containers
check_PROGRAMS += serialization
check_PROGRAMS += test_low_level
check_PROGRAMS += test_high_level
check_PROGRAMS += test_connection
EXTRA_PROGRAMS =
if HAVE_COROUTINES
@@ -44,7 +44,7 @@ intro_SOURCES = $(top_srcdir)/examples/intro.cpp
intro_sync_SOURCES = $(top_srcdir)/examples/intro_sync.cpp
containers_SOURCES = $(top_srcdir)/examples/containers.cpp
serialization_SOURCES = $(top_srcdir)/examples/serialization.cpp
test_high_level_SOURCES = $(top_srcdir)/tests/high_level.cpp
test_connection_SOURCES = $(top_srcdir)/tests/connection.cpp
subscriber_sync_SOURCES = $(top_srcdir)/examples/subscriber_sync.cpp
if HAVE_COROUTINES
subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp

View File

@@ -1,5 +1,5 @@
AC_PREREQ([2.69])
AC_INIT([Aedis], [0.2.1], [mzimbres@gmail.com])
AC_INIT([Aedis], [0.3.0], [mzimbres@gmail.com])
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_HEADERS([config.h])
AC_CONFIG_SRCDIR(include/aedis.hpp)

View File

@@ -823,7 +823,7 @@ WARN_LOGFILE =
# spaces. See also FILE_PATTERNS and EXTENSION_MAPPING
# Note: If this tag is empty the current directory is searched.
INPUT = include benchmarks/benchmarks.md . examples
INPUT = include benchmarks/benchmarks.md CHANGELOG.md examples
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

View File

@@ -41,7 +41,7 @@ using net::experimental::as_tuple;
* > CLIENT kill TYPE pubsub
*/
net::awaitable<void> reader(std::shared_ptr<connection> db)
net::awaitable<void> receiver(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "channel");
@@ -54,27 +54,17 @@ net::awaitable<void> reader(std::shared_ptr<connection> db)
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;
case connection::event::hello:
// Subscribes to the channels again after stablishing a new
// connection.
co_await db->async_exec(req);
break;
default:;
}
resp.clear();
}
}
net::awaitable<void> reconnect(std::shared_ptr<connection> db)
{
net::steady_timer timer{co_await net::this_coro::executor};
for (;;) {
co_await db->async_run(as_tuple(net::use_awaitable));
// Waits one second and tries again.
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait(net::use_awaitable);
}
}
@@ -83,9 +73,12 @@ int main()
try {
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->get_config().enable_events = true;
net::co_spawn(ioc, reader(db), net::detached);
net::co_spawn(ioc, reconnect(db), net::detached);
db->get_config().enable_reconnect = true;
net::co_spawn(ioc, receiver(db), net::detached);
db->async_run(net::detached);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });
ioc.run();

View File

@@ -29,7 +29,9 @@ int main()
try {
net::io_context ioc{1};
connection conn{ioc};
conn.get_config().enable_events = true;
conn.get_config().enable_reconnect = true;
std::thread thread{[&]() {
conn.async_run(net::detached);
@@ -45,16 +47,17 @@ int main()
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;
case connection::event::hello:
// Subscribes to the channels again after stablishing a
// new connection.
exec(conn, req);
break;
default:;
}
resp.clear();
}
thread.join();

View File

@@ -73,6 +73,9 @@ public:
/// Time interval ping operations.
std::chrono::milliseconds ping_interval = std::chrono::seconds{1};
/// Time waited before trying a reconnection (see enable reconnect).
std::chrono::milliseconds reconnect_interval = std::chrono::seconds{1};
/// The maximum size allowed on read operations.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
@@ -81,6 +84,9 @@ public:
/// Enable events
bool enable_events = false;
/// Enable automatic reconnection (see also reconnect_interval).
bool enable_reconnect = false;
};
/// Events communicated through \c async_receive_event.
@@ -152,8 +158,6 @@ public:
*
* For an example see echo_server.cpp.
*
* \param host Redis address.
* \param port Redis port.
* \param token Completion token.
*
* The completion token must have the following signature
@@ -211,8 +215,6 @@ public:
* single function. This function is useful for users that want to
* send a single request to the server and close it.
*
* \param host Address of the Redis server.
* \param port Port of the Redis server.
* \param req Request object.
* \param adapter Response adapter.
* \param token Asio completion token.
@@ -253,7 +255,7 @@ public:
* have the following signature
*
* @code
* void f(boost::system::error_code, std::size_t);
* void f(boost::system::error_code, event);
* @endcode
*
* Where the second parameter is the size of the response that has
@@ -335,9 +337,8 @@ public:
reqs_.erase(point, std::end(reqs_));
}
/** @brief TODO
*/
void cancel_receiver()
/// Cancels the event receiver.
void cancel_event_receiver()
{
push_channel_.cancel();
}
@@ -366,6 +367,7 @@ private:
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::ping_op;
template <class T> friend struct detail::run_op;
template <class T> friend struct detail::run_one_op;
template <class T, class U> friend struct detail::exec_op;
template <class T, class U> friend struct detail::exec_read_op;
template <class T, class U> friend struct detail::runexec_op;
@@ -375,6 +377,15 @@ private:
template <class T> friend struct detail::start_op;
template <class T> friend struct detail::send_receive_op;
template <class CompletionToken = default_completion_token_type>
auto async_run_one(CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::run_one_op<connection>{this}, token, resv_);
}
void cancel_push_requests()
{
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {

View File

@@ -381,7 +381,7 @@ struct start_op {
};
template <class Conn>
struct run_op {
struct run_one_op {
Conn* conn;
boost::asio::coroutine coro{};
@@ -469,6 +469,36 @@ struct run_op {
}
};
template <class Conn>
struct run_op {
Conn* conn;
boost::asio::coroutine coro{};
template <class Self>
void operator()(
Self& self,
boost::system::error_code ec = {},
std::size_t = 0)
{
reenter (coro) for(;;)
{
yield conn->async_run_one(std::move(self));
if (!conn->cfg_.enable_reconnect) {
self.complete(ec);
return;
}
// Consider communicating the return of async_run_one as an
// event here.
conn->ping_timer_.expires_after(conn->cfg_.reconnect_interval);
yield conn->ping_timer_.async_wait(std::move(self));
}
}
};
template <class Conn>
struct writer_op {
Conn* conn;

View File

@@ -226,7 +226,7 @@ void test_push_is_received1(connection::config const& cfg)
db->async_run(req, aedis::adapt(), [db](auto ec, auto){
expect_no_error(ec, "test_push_is_received1");
db->cancel_receiver();
db->cancel_event_receiver();
});
bool received = false;
@@ -266,7 +266,7 @@ void test_push_is_received2(connection::config const& cfg)
db->async_run([db](auto ec, auto...) {
expect_error(ec, net::error::misc_errors::eof, "test_push_is_received2");
db->cancel_receiver();
db->cancel_event_receiver();
});
bool received = false;
@@ -279,28 +279,30 @@ void test_push_is_received2(connection::config const& cfg)
expect_true(received);
}
net::awaitable<void> run5(std::shared_ptr<connection> db)
net::awaitable<void> test_reconnect_impl(std::shared_ptr<connection> db)
{
{
request req;
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_reconnect");
});
request req;
req.push("QUIT");
auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof, "run5a");
}
for (auto i = 0;;) {
auto ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::resolve, "test_reconnect.");
{
request req;
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_reconnect");
});
ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::connect, "test_reconnect.");
auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof, "run5a");
ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::hello, "test_reconnect.");
co_await db->async_exec(req, aedis::adapt(), net::use_awaitable);
// Test 5 reconnetions and returns.
++i;
if (i == 5) {
db->get_config().enable_reconnect = false;
co_return;
}
}
co_return;
@@ -309,13 +311,21 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
// Test whether the client works after a reconnect.
void test_reconnect()
{
std::cout << "test_reconnect" << std::endl;
std::cout << "Start: test_reconnect" << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
db->get_config().enable_events = true;
db->get_config().enable_reconnect = true;
db->get_config().reconnect_interval = std::chrono::milliseconds{100};
net::co_spawn(ioc, test_reconnect_impl(db), net::detached);
db->async_run([](auto ec) {
expect_error(ec, net::error::misc_errors::eof, "test_reconnect.");
});
net::co_spawn(ioc, run5(db), net::detached);
ioc.run();
std::cout << "Success: test_reconnect()" << std::endl;
std::cout << "End: test_reconnect()" << std::endl;
}
net::awaitable<void>
@@ -363,7 +373,7 @@ void test_push_many_subscribes(connection::config const& cfg)
db->async_run([db](auto ec, auto...) {
expect_error(ec, net::error::misc_errors::eof, "test_push_many_subscribes");
db->cancel_receiver();
db->cancel_event_receiver();
});
net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);