From 770e224917e4f3afea2b244cf1448d19f052873c Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 9 Oct 2022 11:40:22 +0200 Subject: [PATCH] Changes: - CI fix. - Renames request::fail_* to request::cancel_*. - Adds a second parameter to async_run. - Adds request::retry flag. --- README.md | 19 ++++++++-- benchmarks/cpp/asio/echo_server_direct.cpp | 1 + examples/echo_server.cpp | 2 +- examples/intro_tls.cpp | 4 +- examples/subscriber.cpp | 4 +- examples/subscriber_sentinel.cpp | 8 ++-- include/aedis/connection.hpp | 2 +- include/aedis/detail/connection_base.hpp | 43 +++++++++++++++------- include/aedis/detail/connection_ops.hpp | 16 ++++---- include/aedis/resp3/request.hpp | 29 +++++++++------ tests/cancelation.cpp | 4 +- tests/connection_connect.cpp | 2 +- tests/connection_other.cpp | 35 +++++++++++++++++- tests/connection_push.cpp | 10 ++--- tests/connection_quit.cpp | 4 +- tests/connection_quit_coalesce.cpp | 2 +- tests/connection_reconnect.cpp | 10 ++--- tests/connection_tls.cpp | 2 +- 18 files changed, 130 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index a204b68c..f17ffa91 100644 --- a/README.md +++ b/README.md @@ -706,14 +706,25 @@ The code used in the benchmarks can be found at ### master * Renames `fail_on_connection_lost` to - `aedis::resp3::request::fail_on_connection_lost` and change its - behaviour: Setting it to true won't cause the request to be fail if - `async_exec` is called when there is no ongoing connection, which is - not the role of `aedis::resp3::request::fail_on_connection_lost`. + `aedis::resp3::request::cancel_on_connection_lost`. Now, it will + only cause connections to be canceled when `async_run` completes. + +* Introduces `aedis::resp3::request::cancel_if_not_connected` which will + cause a request to be canceled if `async_exec` is called before a + connection has been stablished. + +* Introduces new request flag `aedis::resp3::request::retry` that if + set to true will cause the request to not be canceled when it was + sent to Redis but remained unresponded after `async_run` completed. + It provides a way to avoid executing commands twice. * Removes the `aedis::connection::async_run` overload that takes request and adapter as parameters. +* Adds a second parameter to the `aedis::connection::async_run` + completion signature that contains the number of requests that have + been canceled on its completion. + ### v1.1.0/1 * Removes `coalesce_requests` from the `aedis::connection::config`, it diff --git a/benchmarks/cpp/asio/echo_server_direct.cpp b/benchmarks/cpp/asio/echo_server_direct.cpp index 843fd241..f29617e6 100644 --- a/benchmarks/cpp/asio/echo_server_direct.cpp +++ b/benchmarks/cpp/asio/echo_server_direct.cpp @@ -9,6 +9,7 @@ // #include +#include #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/examples/echo_server.cpp b/examples/echo_server.cpp index dff3f02d..141adfe3 100644 --- a/examples/echo_server.cpp +++ b/examples/echo_server.cpp @@ -56,7 +56,7 @@ auto main() -> int net::io_context ioc{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO}; auto db = std::make_shared(ioc); endpoint ep{"127.0.0.1", "6379"}; - db->async_run(ep, {}, [&](auto const& ec) { + db->async_run(ep, {}, [&](auto const& ec, auto) { std::clog << ec.message() << std::endl; ioc.stop(); }); diff --git a/examples/intro_tls.cpp b/examples/intro_tls.cpp index 0f5c00fe..84b90429 100644 --- a/examples/intro_tls.cpp +++ b/examples/intro_tls.cpp @@ -41,8 +41,8 @@ auto main() -> int conn.next_layer().set_verify_callback(verify_certificate); request req; - req.get_config().fail_if_not_connected = false; - req.get_config().fail_on_connection_lost = true; + req.get_config().cancel_if_not_connected = false; + req.get_config().cancel_on_connection_lost = true; req.push("PING"); req.push("QUIT"); diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index 1e2a4edd..458984b2 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -60,8 +60,8 @@ net::awaitable push_receiver(std::shared_ptr conn) net::awaitable reconnect(std::shared_ptr conn) { request req; - req.get_config().fail_if_not_connected = false; - req.get_config().fail_on_connection_lost = true; + req.get_config().cancel_if_not_connected = false; + req.get_config().cancel_on_connection_lost = true; req.push("SUBSCRIBE", "channel"); stimer timer{co_await net::this_coro::executor}; diff --git a/examples/subscriber_sentinel.cpp b/examples/subscriber_sentinel.cpp index b4a87f08..7571836e 100644 --- a/examples/subscriber_sentinel.cpp +++ b/examples/subscriber_sentinel.cpp @@ -53,8 +53,8 @@ net::awaitable resolve() }; request req; - req.get_config().fail_if_not_connected = false; - req.get_config().fail_on_connection_lost = true; + req.get_config().cancel_if_not_connected = false; + req.get_config().cancel_on_connection_lost = true; req.push("SENTINEL", "get-master-addr-by-name", "mymaster"); req.push("QUIT"); @@ -88,8 +88,8 @@ net::awaitable resolve() net::awaitable reconnect(std::shared_ptr conn) { request req; - req.get_config().fail_if_not_connected = false; - req.get_config().fail_on_connection_lost = true; + req.get_config().cancel_if_not_connected = false; + req.get_config().cancel_on_connection_lost = true; req.push("SUBSCRIBE", "channel"); auto ex = co_await net::this_coro::executor; diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index a6558b6d..84fb5f00 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -132,7 +132,7 @@ public: * The completion token must have the following signature * * @code - * void f(boost::system::error_code); + * void f(boost::system::error_code, std::size_t); * @endcode */ template > diff --git a/include/aedis/detail/connection_base.hpp b/include/aedis/detail/connection_base.hpp index fd39b15a..064cfadf 100644 --- a/include/aedis/detail/connection_base.hpp +++ b/include/aedis/detail/connection_base.hpp @@ -52,8 +52,8 @@ public: , push_channel_{ex} , last_data_{std::chrono::time_point::min()} { - req_.get_config().fail_if_not_connected = false; - req_.get_config().fail_on_connection_lost = true; + req_.get_config().cancel_if_not_connected = false; + req_.get_config().cancel_on_connection_lost = true; writer_timer_.expires_at(std::chrono::steady_clock::time_point::max()); read_timer_.expires_at(std::chrono::steady_clock::time_point::max()); } @@ -84,17 +84,6 @@ public: writer_timer_.cancel(); ping_timer_.cancel(); - auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !ptr->req->get_config().fail_on_connection_lost; - }); - - // Cancel own pings if there are any waiting. - std::for_each(point, std::end(reqs_), [](auto const& ptr) { - ptr->stop = true; - ptr->timer.cancel(); - }); - - reqs_.erase(point, std::end(reqs_)); return 1U; } case operation::receive_push: @@ -106,6 +95,32 @@ public: } } + std::size_t cancel_requests() + { + auto cond = [](auto const& ptr) + { + if (ptr->req->get_config().cancel_on_connection_lost) + return false; + + return !(!ptr->req->get_config().retry && ptr->written); + }; + + auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond); + + auto const ret = std::distance(point, std::end(reqs_)); + + std::for_each(point, std::end(reqs_), [](auto const& ptr) { + ptr->stop = true; + ptr->timer.cancel(); + }); + + reqs_.erase(point, std::end(reqs_)); + std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { + return ptr->written = false; + }); + return ret; + } + template < class Adapter = detail::response_traits::adapter_type, class CompletionToken = boost::asio::default_completion_token_t> @@ -143,7 +158,7 @@ public: ep_ = std::move(ep); return boost::asio::async_compose < CompletionToken - , void(boost::system::error_code) + , void(boost::system::error_code, std::size_t) >(detail::run_op{&derived(), ts}, token, resv_); } diff --git a/include/aedis/detail/connection_ops.hpp b/include/aedis/detail/connection_ops.hpp index a9e04f36..29a15c7d 100644 --- a/include/aedis/detail/connection_ops.hpp +++ b/include/aedis/detail/connection_ops.hpp @@ -229,7 +229,7 @@ struct exec_op { { reenter (coro) { - if (req->get_config().fail_if_not_connected && !conn->is_open()) { + if (req->get_config().cancel_if_not_connected && !conn->is_open()) { // The user doesn't want to wait for the connection to be // stablished. self.complete(error::not_connected, 0); @@ -422,7 +422,7 @@ struct run_op { conn->async_resolve_with_timeout(ts.resolve_timeout, std::move(self)); if (ec) { conn->cancel(operation::run); - self.complete(ec); + self.complete(ec, conn->cancel_requests()); return; } @@ -430,7 +430,7 @@ struct run_op { conn->derived().async_connect(conn->endpoints_, ts, conn->ping_timer_, std::move(self)); if (ec) { conn->cancel(operation::run); - self.complete(ec); + self.complete(ec, conn->cancel_requests()); return; } @@ -449,7 +449,7 @@ struct run_op { if (ec) { conn->cancel(operation::run); - self.complete(ec); + self.complete(ec, conn->cancel_requests()); return; } @@ -457,18 +457,16 @@ struct run_op { if (!conn->expect_role(conn->ep_.role)) { conn->cancel(operation::run); - self.complete(error::unexpected_server_role); + self.complete(error::unexpected_server_role, conn->cancel_requests()); return; } conn->write_buffer_.clear(); conn->cmds_ = 0; - std::for_each(std::begin(conn->reqs_), std::end(conn->reqs_), [](auto const& ptr) { - return ptr->written = false; - }); yield conn->async_start(ts, std::move(self)); - self.complete(ec); + + self.complete(ec, conn->cancel_requests()); } } }; diff --git a/include/aedis/resp3/request.hpp b/include/aedis/resp3/request.hpp index 24f6cc8c..b9c580e4 100644 --- a/include/aedis/resp3/request.hpp +++ b/include/aedis/resp3/request.hpp @@ -148,8 +148,8 @@ void add_separator(Request& to) } } // detail -/** @brief Creates Redis requests. - * @ingroup high-level-api +/** \brief Creates Redis requests. + * \ingroup high-level-api * * A request is composed of one or more Redis commands and is * referred to in the redis documentation as a pipeline, see @@ -165,25 +165,25 @@ void add_separator(Request& to) * co_await async_write(socket, buffer(r)); * @endcode * - * @remarks + * \remarks * - * @li Non-string types will be converted to string by using \c + * \li Non-string types will be converted to string by using \c * to_bulk, which must be made available over ADL. - * @li Uses std::string as internal storage. + * \li Uses std::string as internal storage. */ class request { public: /// Request configuration options. struct config { - /** @brief If set to true, requests started with + /** \brief If set to true, requests started with * `connection::async_exe` will fail either if the connection is * lost while the request is pending or if `async_exec` is * called while there is no connection with Redis. The default * behaviour is not to close requests. */ - bool fail_on_connection_lost = false; + bool cancel_on_connection_lost = false; - /** @brief Coalesce this with other requests. + /** \brief Coalesce this with other requests. * * If true this request will be coalesced with other requests, * see https://redis.io/topics/pipelining. If false, this @@ -191,20 +191,25 @@ public: */ bool coalesce = true; - /** @brief If set to true, requests started with + /** \brief If set to true, requests started with * `connection::async_exe` will fail if the called happens * before the connection with Redis is stablished. */ - bool fail_if_not_connected = false; + bool cancel_if_not_connected = false; - // TODO: Add retry flag. + /** \brief If true, the implementation will resend this + * request if it remained unresponded when + * `aedis::connection::async_run` completed. Has effect only if + * cancel_on_connection_lost is false. + */ + bool retry = true; }; /** @brief Constructor * * @param cfg Configuration options. */ - explicit request(config cfg = config{false, true, false}) + explicit request(config cfg = config{false, true, false, true}) : cfg_{cfg} {} diff --git a/tests/cancelation.cpp b/tests/cancelation.cpp index d3d2d59b..7a76d38a 100644 --- a/tests/cancelation.cpp +++ b/tests/cancelation.cpp @@ -6,6 +6,7 @@ #include #include +#ifdef BOOST_ASIO_HAS_CO_AWAIT #include #include @@ -24,7 +25,6 @@ using endpoint = aedis::endpoint; using error_code = boost::system::error_code; using net::experimental::as_tuple; -#ifdef BOOST_ASIO_HAS_CO_AWAIT #include using namespace net::experimental::awaitable_operators; @@ -53,4 +53,6 @@ BOOST_AUTO_TEST_CASE(test_cancel_run) net::co_spawn(ioc.get_executor(), async_test_cancel_run(), net::detached); ioc.run(); } +#else +int main(){} #endif diff --git a/tests/connection_connect.cpp b/tests/connection_connect.cpp index 2c00e72a..2ecf130c 100644 --- a/tests/connection_connect.cpp +++ b/tests/connection_connect.cpp @@ -32,7 +32,7 @@ error_code test_async_run(endpoint ep, connection::timeouts cfg = {}) net::io_context ioc; connection db{ioc}; error_code ret; - db.async_run(ep, cfg, [&](auto ec) { ret = ec; }); + db.async_run(ep, cfg, [&](auto ec, auto) { ret = ec; }); ioc.run(); return ret; } diff --git a/tests/connection_other.cpp b/tests/connection_other.cpp index 804ef9ce..f2e3782d 100644 --- a/tests/connection_other.cpp +++ b/tests/connection_other.cpp @@ -149,7 +149,7 @@ BOOST_AUTO_TEST_CASE(test_wrong_data_type) db->async_exec(req, adapt(resp), [](auto ec, auto){ BOOST_CHECK_EQUAL(ec, aedis::error::not_a_number); }); - db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){ + db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){ BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); }); @@ -160,7 +160,7 @@ BOOST_AUTO_TEST_CASE(test_not_connected) { std::cout << boost::unit_test::framework::current_test_case().p_name << std::endl; request req; - req.get_config().fail_if_not_connected = true; + req.get_config().cancel_if_not_connected = true; req.push("PING"); net::io_context ioc; @@ -171,3 +171,34 @@ BOOST_AUTO_TEST_CASE(test_not_connected) ioc.run(); } + +BOOST_AUTO_TEST_CASE(test_retry) +{ + std::cout << boost::unit_test::framework::current_test_case().p_name << std::endl; + + request req1; + req1.get_config().cancel_on_connection_lost = true; + req1.push("CLIENT", "PAUSE", 7000); + + request req2; + req2.get_config().cancel_on_connection_lost = false; + req2.get_config().retry = false; + req2.push("PING"); + + net::io_context ioc; + auto db = std::make_shared(ioc); + + db->async_exec(req1, adapt(), [](auto ec, auto){ + BOOST_TEST(!ec); + }); + + db->async_exec(req2, adapt(), [](auto ec, auto){ + BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + }); + + db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){ + BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout); + }); + + ioc.run(); +} diff --git a/tests/connection_push.cpp b/tests/connection_push.cpp index 6db142f0..8faf75ae 100644 --- a/tests/connection_push.cpp +++ b/tests/connection_push.cpp @@ -38,7 +38,7 @@ void test_missing_push_reader1(bool coalesce) BOOST_TEST(!ec); }); - conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){ + conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){ BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout); }); @@ -57,7 +57,7 @@ void test_missing_push_reader2(bool coalesce) BOOST_TEST(!ec); }); - conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){ + conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){ BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout); }); @@ -77,7 +77,7 @@ void test_missing_push_reader3(bool coalesce) BOOST_TEST(!ec); }); - conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){ + conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){ BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout); }); @@ -135,7 +135,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_errors::channel_cancelled); }); - conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){ + conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){ BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); }); @@ -158,7 +158,7 @@ void test_push_is_received1(bool coalesce) BOOST_TEST(!ec); }); - conn->async_run({"127.0.0.1", "6379"}, {}, [conn](auto ec){ + conn->async_run({"127.0.0.1", "6379"}, {}, [conn](auto ec, auto){ BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof); conn->cancel(operation::receive_push); }); diff --git a/tests/connection_quit.cpp b/tests/connection_quit.cpp index f0d43ac0..446cc02f 100644 --- a/tests/connection_quit.cpp +++ b/tests/connection_quit.cpp @@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE(test_quit_no_coalesce) }); endpoint ep{"127.0.0.1", "6379"}; - conn->async_run(ep, {}, [conn](auto ec){ + conn->async_run(ep, {}, [conn](auto ec, auto){ BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof); conn->cancel(operation::exec); }); @@ -74,7 +74,7 @@ void test_quit2(bool coalesce) BOOST_TEST(!ec); }); - conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec) { + conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto) { BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof); }); diff --git a/tests/connection_quit_coalesce.cpp b/tests/connection_quit_coalesce.cpp index 18b2d950..906243fb 100644 --- a/tests/connection_quit_coalesce.cpp +++ b/tests/connection_quit_coalesce.cpp @@ -48,7 +48,7 @@ BOOST_AUTO_TEST_CASE(test_quit_coalesce) }); endpoint ep{"127.0.0.1", "6379"}; - db->async_run(ep, {}, [db](auto ec){ + db->async_run(ep, {}, [db](auto ec, auto){ BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); db->cancel(operation::exec); }); diff --git a/tests/connection_reconnect.cpp b/tests/connection_reconnect.cpp index adbcb227..1b00bfa9 100644 --- a/tests/connection_reconnect.cpp +++ b/tests/connection_reconnect.cpp @@ -6,6 +6,7 @@ #include #include +#ifdef BOOST_ASIO_HAS_CO_AWAIT #define BOOST_TEST_MODULE low level #include @@ -21,7 +22,6 @@ using connection = aedis::connection<>; using endpoint = aedis::endpoint; using error_code = boost::system::error_code; -#ifdef BOOST_ASIO_HAS_CO_AWAIT #include using namespace boost::asio::experimental::awaitable_operators; @@ -65,8 +65,8 @@ auto async_test_reconnect_timeout() -> net::awaitable boost::system::error_code ec1, ec2; request req1; - req1.get_config().fail_if_not_connected = false; - req1.get_config().fail_on_connection_lost = true; + req1.get_config().cancel_if_not_connected = false; + req1.get_config().cancel_on_connection_lost = true; req1.push("CLIENT", "PAUSE", 7000); co_await ( @@ -78,8 +78,8 @@ auto async_test_reconnect_timeout() -> net::awaitable BOOST_CHECK_EQUAL(ec2, aedis::error::idle_timeout); request req2; - req2.get_config().fail_if_not_connected = false; - req2.get_config().fail_on_connection_lost = true; + req2.get_config().cancel_if_not_connected = false; + req2.get_config().cancel_on_connection_lost = true; req2.push("QUIT"); co_await ( diff --git a/tests/connection_tls.cpp b/tests/connection_tls.cpp index 5d6b646c..21db3c25 100644 --- a/tests/connection_tls.cpp +++ b/tests/connection_tls.cpp @@ -36,7 +36,7 @@ boost::system::error_code hello_fail(endpoint ep) conn->next_layer().set_verify_mode(net::ssl::verify_peer); conn->next_layer().set_verify_callback(verify_certificate); boost::system::error_code ret; - conn->async_run(ep, {}, [&](auto ec) { + conn->async_run(ep, {}, [&](auto ec, auto) { ret = ec; });