From 54d448cad4d9524f80da73c596e1121141fa47fb Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 6 Aug 2022 12:36:13 +0200 Subject: [PATCH] Progresses with connection events. --- CHANGELOG.md | 26 ++++++++++------- README.md | 17 +++++++++++- doc/Doxyfile.in | 2 +- examples/chat_room.cpp | 29 ++++++++++++------- examples/intro_sync.cpp | 14 ++++------ examples/subscriber.cpp | 17 +++++------- examples/subscriber_sync.cpp | 37 +++++++++++++++---------- include/aedis.hpp | 2 +- include/aedis/connection.hpp | 17 +++++++++--- include/aedis/detail/connection_ops.hpp | 17 +++++++++--- include/aedis/experimental/sync.hpp | 24 ++++++++++------ tests/high_level.cpp | 6 ++-- 12 files changed, 131 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bec739db..a15ce060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,24 +1,29 @@ # Changelog -## v0.2.2 +## master -* `connection::async_read_push` has been renamed to `async_receive`. +* Adds `experimental::exec` and `experimental::receive_event` + functions to offer a thread safe and synchronous way of executing + requests. See `intro_sync.cpp` and `subscriber_sync.cpp` for + examples. + +* `connection::async_read_push` has been renamed to + `async_receive_event`. + +* Uses `async_receive_event` to communicate internal events to the + user, see subscriber.cpp and `connection::event`. * The `aedis` directory has been moved to `include` to look more similar to Boost libraries. Users should now replace `-I/aedis-path` with `-I/aedis-path/include` in the compiler flags. -* Adds `experimental::exec` functions to offer a thread safe and - synchronous way of executing requests. See `intro_sync.cpp` and - `subscriber_sync.cpp` for an example. - * Fixes a bug in the `connection::async_exec(host, port)` overload - that was causing crashes on reconnect. + that was causing crashes reconnection. * Fixes the executor usage in the connection class. Before theses - changes it was only supporting `any_io_executor`. + changes it was imposing `any_io_executor` on users. -* `connection::async_receiver` is not cancelled anymore when +* `connection::async_receiver_event` is not cancelled anymore when `connection::async_run` exits. This change simplifies the implementation failover operations. @@ -31,7 +36,8 @@ * Many simplifications in the `chat_room` example. -* Fixes build in clang the compilers and makes some improvements in the documentation. +* Fixes build in clang the compilers and makes some improvements in + the documentation. ##v0.2.1 diff --git a/README.md b/README.md index 47c17e25..270faaa5 100644 --- a/README.md +++ b/README.md @@ -1 +1,16 @@ -See the official github-pages for documentation: https://mzimbres.github.io/aedis +An async redis client designed for performance and scalability + +### License + +Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). + +### Build Status + +Branch | GH Actions | +:-------------: | ---------- | +[`master`](https://github.com/mzimbres/aedis/tree/master) | [![CI](https://github.com/mzimbres/aedis/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/mzimbres/aedis/actions/workflows/ci.yml) + +### More information + +* See the official github-pages for documentation: https://mzimbres.github.io/aedis + diff --git a/doc/Doxyfile.in b/doc/Doxyfile.in index a2c72069..906eca49 100644 --- a/doc/Doxyfile.in +++ b/doc/Doxyfile.in @@ -823,7 +823,7 @@ WARN_LOGFILE = # spaces. See also FILE_PATTERNS and EXTENSION_MAPPING # Note: If this tag is empty the current directory is searched. -INPUT = include benchmarks/benchmarks.md examples +INPUT = include benchmarks/benchmarks.md . examples # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses diff --git a/examples/chat_room.cpp b/examples/chat_room.cpp index 39e21170..867f6f1a 100644 --- a/examples/chat_room.cpp +++ b/examples/chat_room.cpp @@ -9,6 +9,7 @@ #include #include #include "unistd.h" +#include "print.hpp" // Include this in no more than one .cpp file. #include @@ -16,7 +17,6 @@ #if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) namespace net = boost::asio; -using aedis::adapt; using aedis::resp3::request; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t; @@ -33,17 +33,23 @@ using response_type = std::vector>; net::awaitable reader(std::shared_ptr db) { - try { - request req; - req.push("SUBSCRIBE", "chat-channel"); - co_await db->async_exec(req); + request req; + req.push("SUBSCRIBE", "chat-channel"); - for (response_type resp;;) { - co_await db->async_receive(adapt(resp)); - std::cout << "> " << resp.at(3).value; - resp.clear(); + for (response_type resp;;) { + auto const ev = co_await db->async_receive_event(aedis::adapt(resp)); + switch (ev) { + case connection::event::push: + print_push(resp); + break; + + case connection::event::hello: + co_await db->async_exec(req); + break; + + default:; } - } catch (std::exception const&) { + resp.clear(); } } @@ -70,6 +76,7 @@ int main() net::io_context ioc{1}; net::posix::stream_descriptor in{ioc, ::dup(STDIN_FILENO)}; auto db = std::make_shared(ioc); + db->get_config().enable_events = true; co_spawn(ioc, run(in, db), net::detached); co_spawn(ioc, reader(db), net::detached); @@ -77,6 +84,8 @@ int main() std::cout << ec.message() << std::endl; }); + net::signal_set signals(ioc, SIGINT, SIGTERM); + signals.async_wait([&](auto, auto){ ioc.stop(); }); ioc.run(); } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/intro_sync.cpp b/examples/intro_sync.cpp index 497ef883..628fea5f 100644 --- a/examples/intro_sync.cpp +++ b/examples/intro_sync.cpp @@ -19,30 +19,26 @@ using aedis::resp3::request; using aedis::experimental::exec; using connection = aedis::connection<>; -// TODO: What is causing the delay? int main() { try { net::io_context ioc{1}; connection conn{ioc}; - std::thread thread{[&]() { ioc.run(); }}; - - // Calls async_run in the correct executor. - net::dispatch(net::bind_executor(ioc, [&]() { + std::thread thread{[&]() { conn.async_run(net::detached); - })); + ioc.run(); + }}; request req; req.push("PING"); req.push("QUIT"); - // Executes commands synchronously. - std::tuple resp; + std::tuple resp; exec(conn, req, adapt(resp)); thread.join(); - std::cout << "Response: " << std::get<1>(resp) << std::endl; + std::cout << "Response: " << std::get<0>(resp) << std::endl; } catch (std::exception const& e) { std::cerr << e.what() << std::endl; } diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index edb2a39b..a905608c 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -25,21 +25,16 @@ using tcp_socket = net::use_awaitable_t<>::as_default_on_t using connection = aedis::connection; using net::experimental::as_tuple; -/* In this example we send a subscription to a channel and start - * reading server side messages indefinitely. +/* This example will subscribe and read pushes indefinitely. * - * After starting the example you can test it by sending messages with - * redis-cli like this + * To test send messages with redis-cli * * $ redis-cli -3 * 127.0.0.1:6379> PUBLISH channel1 some-message * (integer) 3 * 127.0.0.1:6379> * - * The messages will then appear on the terminal you are running the - * example. - * - * To test reconnection try for example to close all clients currently + * To test reconnection try, for example, to close all clients currently * connected to the Redis instance * * $ redis-cli @@ -52,7 +47,7 @@ net::awaitable reader(std::shared_ptr db) req.push("SUBSCRIBE", "channel"); for (std::vector resp;;) { - auto [ec, ev, n] = co_await db->async_receive(aedis::adapt(resp), as_tuple(net::use_awaitable)); + auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(resp), as_tuple(net::use_awaitable)); std::cout << "Event: " << aedis::to_string(ev) << std::endl; @@ -60,9 +55,11 @@ net::awaitable reader(std::shared_ptr db) case connection::event::push: print_push(resp); break; - case connection::event::connect: + + case connection::event::hello: co_await db->async_exec(req); break; + default:; } diff --git a/examples/subscriber_sync.cpp b/examples/subscriber_sync.cpp index fb68298c..2e5021a4 100644 --- a/examples/subscriber_sync.cpp +++ b/examples/subscriber_sync.cpp @@ -9,17 +9,18 @@ #include #include #include +#include "print.hpp" // Include this in no more than one .cpp file. #include namespace net = boost::asio; -using aedis::adapt; using aedis::resp3::request; using aedis::experimental::exec; -using aedis::experimental::receive; +using aedis::experimental::receive_event; using connection = aedis::connection<>; -using node = aedis::resp3::node; +using aedis::resp3::node; +using event = connection::event; // See subscriber.cpp for more info about how to run this example. @@ -28,24 +29,30 @@ int main() try { net::io_context ioc{1}; connection conn{ioc}; + conn.get_config().enable_events = true; std::thread thread{[&]() { - request req; - req.push("HELLO", 3); - req.push("SUBSCRIBE", "channel"); - conn.async_run(req, adapt(), net::detached); + conn.async_run(net::detached); ioc.run(); }}; - for (std::vector resp;;) { - boost::system::error_code ec; - receive(conn, adapt(resp), ec); + request req; + req.push("SUBSCRIBE", "channel"); - std::cout - << "Event: " << resp.at(1).value << "\n" - << "Channel: " << resp.at(2).value << "\n" - << "Message: " << resp.at(3).value << "\n" - << std::endl; + for (std::vector> resp;;) { + boost::system::error_code ec; + auto const ev = receive_event(conn, aedis::adapt(resp), ec); + switch (ev) { + case connection::event::push: + print_push(resp); + break; + + case connection::event::hello: + exec(conn, req); + break; + + default:; + } resp.clear(); } diff --git a/include/aedis.hpp b/include/aedis.hpp index 0e23a06e..5743ebdd 100644 --- a/include/aedis.hpp +++ b/include/aedis.hpp @@ -15,7 +15,7 @@ /** \mainpage Documentation \tableofcontents - Useful links: \subpage any, [Benchmarks](benchmarks/benchmarks.md). + Useful links: \subpage any, [Changelog](CHANGELOG.md) and [Benchmarks](benchmarks/benchmarks.md). \section Overview diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index bf97da10..6da62fb4 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -83,10 +83,17 @@ public: bool enable_events = false; }; + /// Events communicated through \c async_receive_event. enum class event { + /// The address has been successfully resolved. resolve, + /// Connected to the Redis server. connect, + /// Success sending AUTH and HELLO. + hello, + /// A push event has been received. push, + /// Used internally. invalid }; @@ -141,7 +148,7 @@ public: * connection::config::ping_interval. * * \li Starts reading from the socket and delivering events to the - * request started with \c async_exec and \c async_receive. + * request started with \c async_exec and \c async_receive_event. * * For an example see echo_server.cpp. * @@ -255,7 +262,7 @@ public: template < class Adapter = detail::response_traits::adapter_type, class CompletionToken = default_completion_token_type> - auto async_receive( + auto async_receive_event( Adapter adapter = adapt(), CompletionToken token = CompletionToken{}) { @@ -268,7 +275,7 @@ public: return boost::asio::async_compose < CompletionToken - , void(boost::system::error_code, event, std::size_t) + , void(boost::system::error_code, event) >(detail::receive_op{this, f}, token, resv_); } @@ -294,7 +301,7 @@ public: * safe to try a reconnect after that, when that happens, all * pending request will be automatically sent. * - * Calling this function will causes @c async_receive to return + * Calling this function will causes @c async_receive_event to return * with @c boost::asio::experimental::channel_errc::channel_cancelled. * * \remarks @@ -513,7 +520,9 @@ char const* to_string(typename connection::event e) switch (e) { case event_type::resolve: return "resolve"; case event_type::connect: return "connect"; + case event_type::hello: return "hello"; case event_type::push: return "push"; + case event_type::invalid: return "invalid"; default: BOOST_ASSERT_MSG(false, "to_string: unhandled event."); } } diff --git a/include/aedis/detail/connection_ops.hpp b/include/aedis/detail/connection_ops.hpp index 7e88a989..c484b36a 100644 --- a/include/aedis/detail/connection_ops.hpp +++ b/include/aedis/detail/connection_ops.hpp @@ -94,7 +94,7 @@ struct receive_op { { yield conn->push_channel_.async_receive(std::move(self)); if (ec) { - self.complete(ec, Conn::event::invalid, 0); + self.complete(ec, Conn::event::invalid); return; } @@ -103,7 +103,7 @@ struct receive_op { yield resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self)); if (ec) { conn->cancel_run(); - self.complete(ec, Conn::event::invalid, 0); + self.complete(ec, Conn::event::invalid); return; } @@ -111,7 +111,7 @@ struct receive_op { } yield conn->push_channel_.async_send({}, 0, std::move(self)); - self.complete(ec, conn->last_event_, read_size); + self.complete(ec, conn->last_event_); return; } } @@ -418,6 +418,15 @@ struct run_op { return; } + if (conn->cfg_.enable_events) { + conn->last_event_ = Conn::event::connect; + yield async_send_receive(conn->push_channel_, std::move(self)); + if (ec) { + self.complete(ec); + return; + } + } + conn->req_.clear(); if (!std::empty(conn->cfg_.username) && !std::empty(conn->cfg_.password)) conn->req_.push("AUTH", conn->cfg_.username, conn->cfg_.password); @@ -442,7 +451,7 @@ struct run_op { } if (conn->cfg_.enable_events) { - conn->last_event_ = Conn::event::connect; + conn->last_event_ = Conn::event::hello; yield async_send_receive(conn->push_channel_, std::move(self)); if (ec) { self.complete(ec); diff --git a/include/aedis/experimental/sync.hpp b/include/aedis/experimental/sync.hpp index 4d297682..debfd277 100644 --- a/include/aedis/experimental/sync.hpp +++ b/include/aedis/experimental/sync.hpp @@ -7,6 +7,7 @@ #ifndef AEDIS_EXPERIMENTAL_SYNC_HPP #define AEDIS_EXPERIMENTAL_SYNC_HPP +#include #include #include @@ -73,9 +74,14 @@ exec( * @throws std::system_error in case of error. * @returns The number of bytes of the response. */ -template +template < + class Connection, + class ResponseAdapter = aedis::detail::response_traits::adapter_type> std::size_t -exec(Connection& conn, resp3::request const& req, ResponseAdapter adapter) +exec( + Connection& conn, + resp3::request const& req, + ResponseAdapter adapter = aedis::adapt()) { boost::system::error_code ec; auto const res = exec(conn, req, adapter, ec); @@ -95,23 +101,23 @@ exec(Connection& conn, resp3::request const& req, ResponseAdapter adapter) * @returns The number of bytes of the response. */ template -std::size_t -receive( +auto receive_event( Connection& conn, ResponseAdapter adapter, boost::system::error_code& ec) { + using event_type = typename Connection::event; std::mutex mutex; std::condition_variable cv; bool ready = false; - std::size_t res = 0; + event_type ev = event_type::invalid; - auto f = [&conn, &ec, &res, &mutex, &cv, &ready, adapter]() + auto f = [&conn, &ec, &ev, &mutex, &cv, &ready, adapter]() { - conn.async_receive(adapter, [&cv, &mutex, &ready, &res, &ec](auto const& ecp, auto, std::size_t n) { + conn.async_receive_event(adapter, [&cv, &mutex, &ready, &ev, &ec](auto const& ecp, event_type evp) { std::unique_lock ul(mutex); ec = ecp; - res = n; + ev = evp; ready = true; ul.unlock(); cv.notify_one(); @@ -121,7 +127,7 @@ receive( boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f)); std::unique_lock lk(mutex); cv.wait(lk, [&ready]{return ready;}); - return res; + return ev; } } // experimental diff --git a/tests/high_level.cpp b/tests/high_level.cpp index d09fe0b5..88384a53 100644 --- a/tests/high_level.cpp +++ b/tests/high_level.cpp @@ -203,13 +203,13 @@ net::awaitable push_consumer1(std::shared_ptr db, bool& received, char const* msg) { { - auto [ec, ev, n] = co_await db->async_receive(aedis::adapt(), as_tuple(net::use_awaitable)); + auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable)); expect_no_error(ec, msg); received = true; } { - auto [ec, ev, n] = co_await db->async_receive(aedis::adapt(), as_tuple(net::use_awaitable)); + auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable)); expect_error(ec, boost::asio::experimental::channel_errc::channel_cancelled, msg); } } @@ -322,7 +322,7 @@ net::awaitable push_consumer3(std::shared_ptr db) { for (;;) - co_await db->async_receive(aedis::adapt(), net::use_awaitable); + co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); } // Test many subscribe requests.