diff --git a/CHANGELOG.md b/CHANGELOG.md index a15ce060..f2bacb29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,13 @@ # Changelog -## master +## v0.3.0 -* Adds `experimental::exec` and `experimental::receive_event` +* Adds `experimental::exec` and `receive_event` functions to offer a thread safe and synchronous way of executing requests. See `intro_sync.cpp` and `subscriber_sync.cpp` for examples. -* `connection::async_read_push` has been renamed to - `async_receive_event`. +* `connection::async_read_push` was renamed to `async_receive_event`. * Uses `async_receive_event` to communicate internal events to the user, see subscriber.cpp and `connection::event`. @@ -17,6 +16,11 @@ similar to Boost libraries. Users should now replace `-I/aedis-path` with `-I/aedis-path/include` in the compiler flags. +* AUTH and HELLO commands are sent automatically. This change was + necessary to implement reconnection. + +* Adds support for reconnection. See connection::enable reconnect. + * Fixes a bug in the `connection::async_exec(host, port)` overload that was causing crashes reconnection. diff --git a/Makefile.am b/Makefile.am index e5ac1879..79e9161e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -20,7 +20,7 @@ check_PROGRAMS += intro_sync check_PROGRAMS += containers check_PROGRAMS += serialization check_PROGRAMS += test_low_level -check_PROGRAMS += test_high_level +check_PROGRAMS += test_connection EXTRA_PROGRAMS = if HAVE_COROUTINES @@ -44,7 +44,7 @@ intro_SOURCES = $(top_srcdir)/examples/intro.cpp intro_sync_SOURCES = $(top_srcdir)/examples/intro_sync.cpp containers_SOURCES = $(top_srcdir)/examples/containers.cpp serialization_SOURCES = $(top_srcdir)/examples/serialization.cpp -test_high_level_SOURCES = $(top_srcdir)/tests/high_level.cpp +test_connection_SOURCES = $(top_srcdir)/tests/connection.cpp subscriber_sync_SOURCES = $(top_srcdir)/examples/subscriber_sync.cpp if HAVE_COROUTINES subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp diff --git a/configure.ac b/configure.ac index 87c32a32..4a61223f 100644 --- a/configure.ac +++ b/configure.ac @@ -1,5 +1,5 @@ AC_PREREQ([2.69]) -AC_INIT([Aedis], [0.2.1], [mzimbres@gmail.com]) +AC_INIT([Aedis], [0.3.0], [mzimbres@gmail.com]) AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_HEADERS([config.h]) AC_CONFIG_SRCDIR(include/aedis.hpp) diff --git a/doc/Doxyfile.in b/doc/Doxyfile.in index 906eca49..11a202dd 100644 --- a/doc/Doxyfile.in +++ b/doc/Doxyfile.in @@ -823,7 +823,7 @@ WARN_LOGFILE = # spaces. See also FILE_PATTERNS and EXTENSION_MAPPING # Note: If this tag is empty the current directory is searched. -INPUT = include benchmarks/benchmarks.md . examples +INPUT = include benchmarks/benchmarks.md CHANGELOG.md examples # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index a905608c..4a33dfb7 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -41,7 +41,7 @@ using net::experimental::as_tuple; * > CLIENT kill TYPE pubsub */ -net::awaitable reader(std::shared_ptr db) +net::awaitable receiver(std::shared_ptr db) { request req; req.push("SUBSCRIBE", "channel"); @@ -54,27 +54,17 @@ net::awaitable reader(std::shared_ptr db) switch (ev) { case connection::event::push: print_push(resp); + resp.clear(); break; case connection::event::hello: + // Subscribes to the channels again after stablishing a new + // connection. co_await db->async_exec(req); break; default:; } - - resp.clear(); - } -} - -net::awaitable reconnect(std::shared_ptr db) -{ - net::steady_timer timer{co_await net::this_coro::executor}; - for (;;) { - co_await db->async_run(as_tuple(net::use_awaitable)); - // Waits one second and tries again. - timer.expires_after(std::chrono::seconds{1}); - co_await timer.async_wait(net::use_awaitable); } } @@ -83,9 +73,12 @@ int main() try { net::io_context ioc; auto db = std::make_shared(ioc); + db->get_config().enable_events = true; - net::co_spawn(ioc, reader(db), net::detached); - net::co_spawn(ioc, reconnect(db), net::detached); + db->get_config().enable_reconnect = true; + + net::co_spawn(ioc, receiver(db), net::detached); + db->async_run(net::detached); net::signal_set signals(ioc, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ ioc.stop(); }); ioc.run(); diff --git a/examples/subscriber_sync.cpp b/examples/subscriber_sync.cpp index 2e5021a4..450be56f 100644 --- a/examples/subscriber_sync.cpp +++ b/examples/subscriber_sync.cpp @@ -29,7 +29,9 @@ int main() try { net::io_context ioc{1}; connection conn{ioc}; + conn.get_config().enable_events = true; + conn.get_config().enable_reconnect = true; std::thread thread{[&]() { conn.async_run(net::detached); @@ -45,16 +47,17 @@ int main() switch (ev) { case connection::event::push: print_push(resp); + resp.clear(); break; case connection::event::hello: + // Subscribes to the channels again after stablishing a + // new connection. exec(conn, req); break; default:; } - - resp.clear(); } thread.join(); diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index 6da62fb4..37d1bbc8 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -73,6 +73,9 @@ public: /// Time interval ping operations. std::chrono::milliseconds ping_interval = std::chrono::seconds{1}; + /// Time waited before trying a reconnection (see enable reconnect). + std::chrono::milliseconds reconnect_interval = std::chrono::seconds{1}; + /// The maximum size allowed on read operations. std::size_t max_read_size = (std::numeric_limits::max)(); @@ -81,6 +84,9 @@ public: /// Enable events bool enable_events = false; + + /// Enable automatic reconnection (see also reconnect_interval). + bool enable_reconnect = false; }; /// Events communicated through \c async_receive_event. @@ -152,8 +158,6 @@ public: * * For an example see echo_server.cpp. * - * \param host Redis address. - * \param port Redis port. * \param token Completion token. * * The completion token must have the following signature @@ -211,8 +215,6 @@ public: * single function. This function is useful for users that want to * send a single request to the server and close it. * - * \param host Address of the Redis server. - * \param port Port of the Redis server. * \param req Request object. * \param adapter Response adapter. * \param token Asio completion token. @@ -253,7 +255,7 @@ public: * have the following signature * * @code - * void f(boost::system::error_code, std::size_t); + * void f(boost::system::error_code, event); * @endcode * * Where the second parameter is the size of the response that has @@ -335,9 +337,8 @@ public: reqs_.erase(point, std::end(reqs_)); } - /** @brief TODO - */ - void cancel_receiver() + /// Cancels the event receiver. + void cancel_event_receiver() { push_channel_.cancel(); } @@ -366,6 +367,7 @@ private: template friend struct detail::writer_op; template friend struct detail::ping_op; template friend struct detail::run_op; + template friend struct detail::run_one_op; template friend struct detail::exec_op; template friend struct detail::exec_read_op; template friend struct detail::runexec_op; @@ -375,6 +377,15 @@ private: template friend struct detail::start_op; template friend struct detail::send_receive_op; + template + auto async_run_one(CompletionToken token = CompletionToken{}) + { + return boost::asio::async_compose + < CompletionToken + , void(boost::system::error_code) + >(detail::run_one_op{this}, token, resv_); + } + void cancel_push_requests() { auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { diff --git a/include/aedis/detail/connection_ops.hpp b/include/aedis/detail/connection_ops.hpp index c484b36a..abf5ac18 100644 --- a/include/aedis/detail/connection_ops.hpp +++ b/include/aedis/detail/connection_ops.hpp @@ -381,7 +381,7 @@ struct start_op { }; template -struct run_op { +struct run_one_op { Conn* conn; boost::asio::coroutine coro{}; @@ -469,6 +469,36 @@ struct run_op { } }; +template +struct run_op { + Conn* conn; + boost::asio::coroutine coro{}; + + template + void operator()( + Self& self, + boost::system::error_code ec = {}, + std::size_t = 0) + { + reenter (coro) for(;;) + { + yield conn->async_run_one(std::move(self)); + + if (!conn->cfg_.enable_reconnect) { + self.complete(ec); + return; + } + + // Consider communicating the return of async_run_one as an + // event here. + + conn->ping_timer_.expires_after(conn->cfg_.reconnect_interval); + yield conn->ping_timer_.async_wait(std::move(self)); + } + } +}; + + template struct writer_op { Conn* conn; diff --git a/tests/high_level.cpp b/tests/connection.cpp similarity index 87% rename from tests/high_level.cpp rename to tests/connection.cpp index 88384a53..8d4419fa 100644 --- a/tests/high_level.cpp +++ b/tests/connection.cpp @@ -226,7 +226,7 @@ void test_push_is_received1(connection::config const& cfg) db->async_run(req, aedis::adapt(), [db](auto ec, auto){ expect_no_error(ec, "test_push_is_received1"); - db->cancel_receiver(); + db->cancel_event_receiver(); }); bool received = false; @@ -266,7 +266,7 @@ void test_push_is_received2(connection::config const& cfg) db->async_run([db](auto ec, auto...) { expect_error(ec, net::error::misc_errors::eof, "test_push_is_received2"); - db->cancel_receiver(); + db->cancel_event_receiver(); }); bool received = false; @@ -279,28 +279,30 @@ void test_push_is_received2(connection::config const& cfg) expect_true(received); } -net::awaitable run5(std::shared_ptr db) +net::awaitable test_reconnect_impl(std::shared_ptr db) { - { - request req; - req.push("QUIT"); - db->async_exec(req, aedis::adapt(), [](auto ec, auto){ - expect_no_error(ec, "test_reconnect"); - }); + request req; + req.push("QUIT"); - auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable)); - expect_error(ec, net::error::misc_errors::eof, "run5a"); - } + for (auto i = 0;;) { + auto ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); + expect_eq(ev, connection::event::resolve, "test_reconnect."); - { - request req; - req.push("QUIT"); - db->async_exec(req, aedis::adapt(), [](auto ec, auto){ - expect_no_error(ec, "test_reconnect"); - }); + ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); + expect_eq(ev, connection::event::connect, "test_reconnect."); - auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable)); - expect_error(ec, net::error::misc_errors::eof, "run5a"); + ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); + expect_eq(ev, connection::event::hello, "test_reconnect."); + + co_await db->async_exec(req, aedis::adapt(), net::use_awaitable); + + // Test 5 reconnetions and returns. + + ++i; + if (i == 5) { + db->get_config().enable_reconnect = false; + co_return; + } } co_return; @@ -309,13 +311,21 @@ net::awaitable run5(std::shared_ptr db) // Test whether the client works after a reconnect. void test_reconnect() { - std::cout << "test_reconnect" << std::endl; + std::cout << "Start: test_reconnect" << std::endl; net::io_context ioc; auto db = std::make_shared(ioc.get_executor()); + db->get_config().enable_events = true; + db->get_config().enable_reconnect = true; + db->get_config().reconnect_interval = std::chrono::milliseconds{100}; + + net::co_spawn(ioc, test_reconnect_impl(db), net::detached); + + db->async_run([](auto ec) { + expect_error(ec, net::error::misc_errors::eof, "test_reconnect."); + }); - net::co_spawn(ioc, run5(db), net::detached); ioc.run(); - std::cout << "Success: test_reconnect()" << std::endl; + std::cout << "End: test_reconnect()" << std::endl; } net::awaitable @@ -363,7 +373,7 @@ void test_push_many_subscribes(connection::config const& cfg) db->async_run([db](auto ec, auto...) { expect_error(ec, net::error::misc_errors::eof, "test_push_many_subscribes"); - db->cancel_receiver(); + db->cancel_event_receiver(); }); net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);