From c4714d00376fecb9e1428c59f764177b9792ee0d Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Mon, 15 Aug 2022 22:45:55 +0200 Subject: [PATCH] Splits async_receive_event in two functions. --- .codecov.yml | 16 +++++ CHANGELOG.md | 3 + README.md | 12 ++-- examples/chat_room.cpp | 25 +++++--- examples/subscriber.cpp | 28 +++++---- examples/subscriber_sync.cpp | 39 +++++++------ include/aedis/connection.hpp | 53 ++++++++++++----- include/aedis/detail/connection_ops.hpp | 39 +++++-------- include/aedis/experimental/sync.hpp | 66 +++++++++++++++++---- tests/connection.cpp | 77 ++++++++++++++++--------- 10 files changed, 241 insertions(+), 117 deletions(-) create mode 100644 .codecov.yml diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 00000000..100f2463 --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,16 @@ +codecov: + max_report_age: off + require_ci_to_pass: yes + notify: + after_n_builds: 1 + wait_for_ci: yes + +# Change how pull request comments look +comment: + layout: "reach,diff,flags,files,footer" + +ignore: + - "benchmarks/*" + - "examples/*" + - "/usr/*" + - "**/boost/*" diff --git a/CHANGELOG.md b/CHANGELOG.md index 9abcb005..7397f9b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## master + * Split \c connection::async_receive_event in two functions, one to + receive events and other for server side pushes. + * Adds `connection::operation` enum to replace `cancel_*` member functions with a single cancel function that gets what should be cancelled as argument. diff --git a/README.md b/README.md index 572f8c52..4204512a 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,15 @@ +Branch | GH Actions | codecov.io | +:-------------: | ---------- | ---------- | +[`master`](https://github.com/mzimbres/aedis/tree/master) | [![CI](https://github.com/mzimbres/aedis/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/mzimbres/aedis/actions/workflows/ci.yml) | [![codecov](https://codecov.io/gh/mzimbres/aedis/branch/master/graph/badge.svg)](https://codecov.io/gh/mzimbres/aedis/branch/master) + +## Aedis + An async redis client designed for performance and scalability ### License Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). -### Build Status - -Branch | GH Actions | codecov.io | -:-------------: | ---------- | ---------- | -[`master`](https://github.com/mzimbres/aedis/tree/master) | [![CI](https://github.com/mzimbres/aedis/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/mzimbres/aedis/actions/workflows/ci.yml) | [![codecov](https://codecov.io/gh/mzimbres/aedis/branch/master/graph/badge.svg)](https://codecov.io/gh/mzimbres/aedis/branch/master) - ### More information * See the official github-pages for documentation: https://mzimbres.github.io/aedis diff --git a/examples/chat_room.cpp b/examples/chat_room.cpp index 789f7171..ddda7f1b 100644 --- a/examples/chat_room.cpp +++ b/examples/chat_room.cpp @@ -31,25 +31,31 @@ using response_type = std::vector>; // // To see the message traffic. -net::awaitable subscriber(std::shared_ptr db) +net::awaitable push_receiver(std::shared_ptr db) +{ + for (response_type resp;;) { + co_await db->async_receive_push(aedis::adapt(resp)); + print_push(resp); + resp.clear(); + } +} + +net::awaitable event_receiver(std::shared_ptr db) { request req; req.push("SUBSCRIBE", "chat-channel"); - for (response_type resp;;) { - auto const ev = co_await db->async_receive_event(aedis::adapt(resp)); + for (;;) { + auto const ev = co_await db->async_receive_event(); switch (ev) { - case connection::event::push: - print_push(resp); - break; - case connection::event::hello: + // Subscribes to the channels when a new connection is + // stablished. co_await db->async_exec(req); break; default:; } - resp.clear(); } } @@ -75,7 +81,8 @@ int main() db->get_config().enable_events = true; co_spawn(ioc, publisher(in, db), net::detached); - co_spawn(ioc, subscriber(db), net::detached); + co_spawn(ioc, push_receiver(db), net::detached); + co_spawn(ioc, event_receiver(db), net::detached); db->async_run([](auto ec) { std::cout << ec.message() << std::endl; }); diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index 21eb1090..d6bf12df 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -39,22 +39,23 @@ using connection = aedis::connection; * > CLIENT kill TYPE pubsub */ -net::awaitable receiver(std::shared_ptr db) +net::awaitable push_receiver(std::shared_ptr db) +{ + for (std::vector resp;;) { + co_await db->async_receive_push(aedis::adapt(resp)); + print_push(resp); + resp.clear(); + } +} + +net::awaitable event_receiver(std::shared_ptr db) { request req; req.push("SUBSCRIBE", "channel"); - for (std::vector resp;;) { - auto const ev = co_await db->async_receive_event(aedis::adapt(resp)); - - std::cout << "Event: " << aedis::to_string(ev) << std::endl; - + for (;;) { + auto const ev = co_await db->async_receive_event(); switch (ev) { - case connection::event::push: - print_push(resp); - resp.clear(); - break; - case connection::event::hello: // Subscribes to the channels when a new connection is // stablished. @@ -75,10 +76,13 @@ int main() db->get_config().enable_events = true; db->get_config().enable_reconnect = true; - net::co_spawn(ioc, receiver(db), net::detached); + net::co_spawn(ioc, push_receiver(db), net::detached); + net::co_spawn(ioc, event_receiver(db), net::detached); db->async_run(net::detached); + net::signal_set signals(ioc, SIGINT, SIGTERM); signals.async_wait([&](auto, auto){ ioc.stop(); }); + ioc.run(); } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; diff --git a/examples/subscriber_sync.cpp b/examples/subscriber_sync.cpp index 9dc23da8..aac59d24 100644 --- a/examples/subscriber_sync.cpp +++ b/examples/subscriber_sync.cpp @@ -11,6 +11,8 @@ #include #include "print.hpp" +// TODO: Not working. + // Include this in no more than one .cpp file. #include @@ -18,28 +20,30 @@ namespace net = boost::asio; using aedis::resp3::request; using aedis::experimental::exec; using aedis::experimental::receive_event; +using aedis::experimental::receive_push; using connection = aedis::connection<>; using aedis::resp3::node; using event = connection::event; // See subscriber.cpp for more info about how to run this example. -void subscriber(connection& conn) +void push_receiver(connection& conn) +{ + for (std::vector> resp;;) { + receive_push(conn, aedis::adapt(resp)); + print_push(resp); + resp.clear(); + } +} + +void event_receiver(connection& conn) { request req; req.push("SUBSCRIBE", "channel"); - for (std::vector> resp;;) { - auto const ev = receive_event(conn, aedis::adapt(resp)); - switch (ev) { - case connection::event::push: - print_push(resp); - resp.clear(); - break; - + for (;;) { + switch (receive_event(conn)) { case connection::event::hello: - // Subscribes to the channels when a new connection is - // stablished. exec(conn, req); break; @@ -57,13 +61,14 @@ int main() conn.get_config().enable_events = true; conn.get_config().enable_reconnect = true; - std::thread thread{[&]() { - conn.async_run(net::detached); - ioc.run(); - }}; + std::thread push_thread{[&]() { push_receiver(conn); }}; + std::thread event_thread{[&]() { event_receiver(conn); }}; - subscriber(conn); - thread.join(); + conn.async_run(net::detached); + ioc.run(); + + event_thread.join(); + push_thread.join(); } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index b8c3f37e..0c4ef388 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -43,7 +43,7 @@ public: using next_layer_type = AsyncReadWriteStream; using default_completion_token_type = boost::asio::default_completion_token_t; - using channel_type = boost::asio::experimental::channel; + using push_channel_type = boost::asio::experimental::channel; using clock_type = std::chrono::steady_clock; using clock_traits_type = boost::asio::wait_traits; using timer_type = boost::asio::basic_waitable_timer; @@ -97,12 +97,14 @@ public: connect, /// Success sending AUTH and HELLO. hello, - /// A push event has been received. - push, + /// Successful writing on a socket. + write, /// Used internally. invalid }; + using event_channel_type = boost::asio::experimental::channel; + /** @brief Async operations that can be cancelled. * * See the \c cancel member function for more information. @@ -113,7 +115,9 @@ public: /// Operation started with \c async_run. run, /// Operation started with async_receive_event. - receive_event + receive_event, + /// Operation started with async_receive_push. + receive_push, }; /** \brief Construct a connection from an executor. @@ -128,6 +132,7 @@ public: , writer_timer_{ex} , read_timer_{ex} , push_channel_{ex} + , event_channel_{ex} , cfg_{cfg} , last_data_{std::chrono::time_point::min()} { @@ -268,16 +273,13 @@ public: * have the following signature * * @code - * void f(boost::system::error_code, event); + * void f(boost::system::error_code, std::size_t); * @endcode - * - * Where the second parameter is the size of the response that has - * just been read. */ template < class Adapter = detail::response_traits::adapter_type, class CompletionToken = default_completion_token_type> - auto async_receive_event( + auto async_receive_push( Adapter adapter = adapt(), CompletionToken token = CompletionToken{}) { @@ -290,8 +292,26 @@ public: return boost::asio::async_compose < CompletionToken - , void(boost::system::error_code, event) - >(detail::receive_op{this, f}, token, resv_); + , void(boost::system::error_code, std::size_t) + >(detail::receive_push_op{this, f}, token, resv_); + } + + /** @brief Receives internal events. + * + * See enum \c events for a list of events. + * + * \param token The Asio completion token. + * + * The completion token must have the following signature + * + * @code + * void f(boost::system::error_code, event); + * @endcode + */ + template + auto async_receive_event(CompletionToken token = CompletionToken{}) + { + return event_channel_.async_receive(token); } /** @brief Cancel operations. @@ -347,6 +367,11 @@ public: return 1U; } case operation::receive_event: + { + event_channel_.cancel(); + return 1U; + } + case operation::receive_push: { push_channel_.cancel(); return 1U; @@ -375,7 +400,7 @@ private: using time_point_type = std::chrono::time_point; using reqs_type = std::deque>; - template friend struct detail::receive_op; + template friend struct detail::receive_push_op; template friend struct detail::reader_op; template friend struct detail::writer_op; template friend struct detail::ping_op; @@ -518,14 +543,14 @@ private: timer_type check_idle_timer_; timer_type writer_timer_; timer_type read_timer_; - channel_type push_channel_; + push_channel_type push_channel_; + event_channel_type event_channel_; config cfg_; std::string read_buffer_; std::string write_buffer_; std::size_t cmds_ = 0; reqs_type reqs_; - event last_event_ = event::invalid; // Last time we received data. time_point_type last_data_; diff --git a/include/aedis/detail/connection_ops.hpp b/include/aedis/detail/connection_ops.hpp index 8ce53567..1b373c97 100644 --- a/include/aedis/detail/connection_ops.hpp +++ b/include/aedis/detail/connection_ops.hpp @@ -78,7 +78,7 @@ struct resolve_with_timeout_op { }; template -struct receive_op { +struct receive_push_op { Conn* conn = nullptr; Adapter adapter; std::size_t read_size = 0; @@ -94,24 +94,22 @@ struct receive_op { { yield conn->push_channel_.async_receive(std::move(self)); if (ec) { - self.complete(ec, Conn::event::invalid); + self.complete(ec, 0); return; } - if (conn->last_event_ == Conn::event::push) { - BOOST_ASSERT(conn->socket_ != nullptr); - yield resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self)); - if (ec) { - conn->cancel(Conn::operation::run); - self.complete(ec, Conn::event::invalid); - return; - } - - read_size = n; + BOOST_ASSERT(conn->socket_ != nullptr); + yield resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self)); + if (ec) { + conn->cancel(Conn::operation::run); + self.complete(ec, 0); + return; } + read_size = n; + yield conn->push_channel_.async_send({}, 0, std::move(self)); - self.complete(ec, conn->last_event_); + self.complete(ec, read_size); return; } } @@ -154,13 +152,12 @@ struct exec_read_op { } // If the next request is a push we have to handle it to - // the receive_op wait for it to be done and continue. + // the receive_push_op wait for it to be done and continue. if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) { - conn->last_event_ = Conn::event::push; yield async_send_receive(conn->push_channel_, std::move(self)); if (ec) { // Notice we don't call cancel_run() as that is the - // responsability of the receive_op. + // responsability of the receive_push_op. self.complete(ec, 0); return; } @@ -401,8 +398,7 @@ struct run_one_op { } if (conn->cfg_.enable_events) { - conn->last_event_ = Conn::event::resolve; - yield async_send_receive(conn->push_channel_, std::move(self)); + yield conn->event_channel_.async_send({}, Conn::event::resolve, std::move(self)); if (ec) { self.complete(ec); return; @@ -419,8 +415,7 @@ struct run_one_op { } if (conn->cfg_.enable_events) { - conn->last_event_ = Conn::event::connect; - yield async_send_receive(conn->push_channel_, std::move(self)); + yield conn->event_channel_.async_send({}, Conn::event::connect, std::move(self)); if (ec) { self.complete(ec); return; @@ -451,8 +446,7 @@ struct run_one_op { } if (conn->cfg_.enable_events) { - conn->last_event_ = Conn::event::hello; - yield async_send_receive(conn->push_channel_, std::move(self)); + yield conn->event_channel_.async_send({}, Conn::event::hello, std::move(self)); if (ec) { self.complete(ec); return; @@ -592,7 +586,6 @@ struct reader_op { if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push || conn->reqs_.empty() || (!conn->reqs_.empty() && conn->reqs_.front()->cmds == 0)) { - conn->last_event_ = Conn::event::push; yield async_send_receive(conn->push_channel_, std::move(self)); if (ec) { self.complete(ec); diff --git a/include/aedis/experimental/sync.hpp b/include/aedis/experimental/sync.hpp index d24d906b..d2459ee5 100644 --- a/include/aedis/experimental/sync.hpp +++ b/include/aedis/experimental/sync.hpp @@ -101,10 +101,60 @@ exec( * @returns The number of bytes of the response. */ template -auto receive_event( +auto receive_push( Connection& conn, ResponseAdapter adapter, boost::system::error_code& ec) +{ + std::mutex mutex; + std::condition_variable cv; + bool ready = false; + std::size_t n = 0; + + auto f = [&conn, &ec, &mutex, &cv, &n, &ready, adapter]() + { + conn.async_receive_push(adapter, [&cv, &mutex, &n, &ready, &ec](auto const& ecp, std::size_t evp) { + std::unique_lock ul(mutex); + ec = ecp; + n = evp; + ready = true; + ul.unlock(); + cv.notify_one(); + }); + }; + + boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f)); + std::unique_lock lk(mutex); + cv.wait(lk, [&ready]{return ready;}); + return n; +} + +/// TODO +template < + class Connection, + class ResponseAdapter = aedis::detail::response_traits::adapter_type> +auto receive_push( + Connection& conn, + ResponseAdapter adapter = aedis::adapt()) +{ + boost::system::error_code ec; + auto const res = receive_push(conn, adapter, ec); + if (ec) + throw std::system_error(ec); + return res; +} + +/** @brief Receives events + * @ingroup any + * + * This function will block until execution completes. + * + * @param conn The connection. + * @param ec Error code in case of error. + * @returns The event received. + */ +template +auto receive_event(Connection& conn, boost::system::error_code& ec) { using event_type = typename Connection::event; std::mutex mutex; @@ -112,9 +162,9 @@ auto receive_event( bool ready = false; event_type ev = event_type::invalid; - auto f = [&conn, &ec, &ev, &mutex, &cv, &ready, adapter]() + auto f = [&conn, &ec, &ev, &mutex, &cv, &ready]() { - conn.async_receive_event(adapter, [&cv, &mutex, &ready, &ev, &ec](auto const& ecp, event_type evp) { + conn.async_receive_event([&cv, &mutex, &ready, &ev, &ec](auto const& ecp, event_type evp) { std::unique_lock ul(mutex); ec = ecp; ev = evp; @@ -131,15 +181,11 @@ auto receive_event( } /// TODO -template < - class Connection, - class ResponseAdapter = aedis::detail::response_traits::adapter_type> -auto receive_event( - Connection& conn, - ResponseAdapter adapter = aedis::adapt()) +template +auto receive_event(Connection& conn) { boost::system::error_code ec; - auto const res = receive_event(conn, adapter, ec); + auto const res = receive_event(conn, ec); if (ec) throw std::system_error(ec); return res; diff --git a/tests/connection.cpp b/tests/connection.cpp index f4cc829b..c50d8321 100644 --- a/tests/connection.cpp +++ b/tests/connection.cpp @@ -227,44 +227,53 @@ BOOST_AUTO_TEST_CASE(test_idle) } #ifdef BOOST_ASIO_HAS_CO_AWAIT -net::awaitable -push_consumer1(std::shared_ptr db, bool& received) +net::awaitable push_consumer1(std::shared_ptr db, bool& push_received) { { - auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable)); + auto [ec, ev] = co_await db->async_receive_push(aedis::adapt(), as_tuple(net::use_awaitable)); + BOOST_TEST(!ec); + } + + { + auto [ec, ev] = co_await db->async_receive_push(aedis::adapt(), as_tuple(net::use_awaitable)); + BOOST_CHECK_EQUAL(ec, boost::asio::experimental::channel_errc::channel_cancelled); + } + + push_received = true; +} + +net::awaitable event_consumer1(std::shared_ptr db, bool& event_received) +{ + { + auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable)); auto const r = ev == connection::event::resolve; BOOST_TEST(r); BOOST_TEST(!ec); } { - auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable)); + auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable)); auto const r = ev == connection::event::connect; BOOST_TEST(r); BOOST_TEST(!ec); } { - auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable)); + auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable)); auto const r = ev == connection::event::hello; BOOST_TEST(r); BOOST_TEST(!ec); } { - auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable)); - auto const r = ev == connection::event::push; - BOOST_TEST(r); - BOOST_TEST(!ec); - received = true; - } - - { - auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable)); + auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable)); BOOST_CHECK_EQUAL(ec, boost::asio::experimental::channel_errc::channel_cancelled); } + + event_received = true; } + void test_push_is_received1(connection::config const& cfg) { std::cout << "test_push_is_received1" << std::endl; @@ -279,22 +288,29 @@ void test_push_is_received1(connection::config const& cfg) db->async_run(req, aedis::adapt(), [db](auto ec, auto){ BOOST_TEST(!ec); db->cancel(connection::operation::receive_event); + db->cancel(connection::operation::receive_push); }); - bool received = false; + bool push_received = false; net::co_spawn( ioc.get_executor(), - push_consumer1(db, received), + push_consumer1(db, push_received), + net::detached); + + bool event_received = false; + net::co_spawn( + ioc.get_executor(), + event_consumer1(db, event_received), net::detached); ioc.run(); - BOOST_TEST(received); + BOOST_TEST(push_received); + BOOST_TEST(event_received); } void test_push_is_received2(connection::config const& cfg) { - std::cout << "test_push_is_received2" << std::endl; request req1; req1.push("PING", "Message1"); @@ -322,16 +338,25 @@ void test_push_is_received2(connection::config const& cfg) db->async_run([db](auto ec, auto...) { BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof); db->cancel(connection::operation::receive_event); + db->cancel(connection::operation::receive_push); }); - bool received = false; + bool push_received = false; net::co_spawn( ioc.get_executor(), - push_consumer1(db, received), + push_consumer1(db, push_received), + net::detached); + + bool event_received = false; + net::co_spawn( + ioc.get_executor(), + event_consumer1(db, event_received), net::detached); ioc.run(); - BOOST_TEST(received); + + BOOST_TEST(push_received); + BOOST_TEST(event_received); } net::awaitable test_reconnect_impl(std::shared_ptr db) @@ -340,15 +365,15 @@ net::awaitable test_reconnect_impl(std::shared_ptr db) req.push("QUIT"); for (auto i = 0;;) { - auto ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); + auto ev = co_await db->async_receive_event(net::use_awaitable); auto const r1 = ev == connection::event::resolve; BOOST_TEST(r1); - ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); + ev = co_await db->async_receive_event(net::use_awaitable); auto const r2 = ev == connection::event::connect; BOOST_TEST(r2); - ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); + ev = co_await db->async_receive_event(net::use_awaitable); auto const r3 = ev == connection::event::hello; BOOST_TEST(r3); @@ -390,7 +415,7 @@ net::awaitable push_consumer3(std::shared_ptr db) { for (;;) - co_await db->async_receive_event(aedis::adapt(), net::use_awaitable); + co_await db->async_receive_push(aedis::adapt(), net::use_awaitable); } // Test many subscribe requests. @@ -431,7 +456,7 @@ void test_push_many_subscribes(connection::config const& cfg) db->async_run([db](auto ec, auto...) { BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof); - db->cancel(connection::operation::receive_event); + db->cancel(connection::operation::receive_push); }); net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);