From fd967204dfdac00529475252188456ef7ead4de2 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 12 Mar 2023 21:54:32 +0100 Subject: [PATCH] Implements non-member async_run for plain connections. This function will resolve and connect before calling member async_run. --- CMakeLists.txt | 18 +- README.md | 3 + examples/{common => }/boost_redis.cpp | 0 examples/common/common.cpp | 68 ------ examples/common/common.hpp | 33 --- examples/cpp17_intro.cpp | 74 ++---- examples/cpp17_intro_sync.cpp | 27 ++- examples/cpp20_chat_room.cpp | 11 +- examples/cpp20_containers.cpp | 10 +- examples/cpp20_echo_server.cpp | 10 +- examples/cpp20_intro.cpp | 13 +- examples/cpp20_intro_awaitable_ops.cpp | 9 +- examples/cpp20_json.cpp | 6 +- examples/cpp20_protobuf.cpp | 6 +- examples/cpp20_resolve_with_sentinel.cpp | 8 +- examples/cpp20_subscriber.cpp | 13 +- examples/{common => }/main.cpp | 4 +- examples/start.cpp | 33 +++ examples/start.hpp | 17 ++ include/boost/redis.hpp | 1 + .../run.hpp => check_health.hpp} | 28 ++- include/boost/redis/connection.hpp | 8 +- .../boost/redis/detail/connection_base.hpp | 15 +- include/boost/redis/detail/runner.hpp | 225 ++++++++++++++++++ include/boost/redis/error.hpp | 6 + include/boost/redis/impl/error.ipp | 2 + include/boost/redis/operation.hpp | 2 + include/boost/redis/run.hpp | 63 +++++ include/boost/redis/ssl/connection.hpp | 2 +- tests/common.hpp | 15 +- tests/conn_check_health.cpp | 13 +- tests/conn_echo_stress.cpp | 10 +- tests/conn_exec.cpp | 10 +- tests/conn_exec_cancel.cpp | 19 +- tests/conn_exec_error.cpp | 27 +-- tests/conn_exec_retry.cpp | 15 +- tests/conn_push.cpp | 190 ++++++++------- tests/conn_quit.cpp | 10 +- tests/conn_reconnect.cpp | 17 +- tests/conn_run_cancel.cpp | 16 +- tests/conn_tls.cpp | 12 + tests/issue_50.cpp | 13 +- tests/run.cpp | 76 ++++++ 43 files changed, 719 insertions(+), 439 deletions(-) rename examples/{common => }/boost_redis.cpp (100%) delete mode 100644 examples/common/common.cpp delete mode 100644 examples/common/common.hpp rename examples/{common => }/main.cpp (92%) create mode 100644 examples/start.cpp create mode 100644 examples/start.hpp rename include/boost/redis/{experimental/run.hpp => check_health.hpp} (85%) create mode 100644 include/boost/redis/detail/runner.hpp create mode 100644 include/boost/redis/run.hpp create mode 100644 tests/run.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 21b59820..5c30a93c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,9 +59,9 @@ include_directories(include) #======================================================================= add_library(common STATIC - examples/common/common.cpp - examples/common/main.cpp - examples/common/boost_redis.cpp + examples/start.cpp + examples/main.cpp + examples/boost_redis.cpp ) target_compile_features(common PUBLIC cxx_std_20) if (MSVC) @@ -98,12 +98,10 @@ if (MSVC) target_compile_definitions(cpp17_intro PRIVATE _WIN32_WINNT=0x0601) endif() +if (NOT MSVC) add_executable(cpp17_intro_sync examples/cpp17_intro_sync.cpp) target_compile_features(cpp17_intro_sync PUBLIC cxx_std_17) add_test(cpp17_intro_sync cpp17_intro_sync) -if (MSVC) - target_compile_options(cpp17_intro_sync PRIVATE /bigobj) - target_compile_definitions(cpp17_intro_sync PRIVATE _WIN32_WINNT=0x0601) endif() if (NOT MSVC) @@ -320,6 +318,14 @@ target_link_libraries(test_conn_check_health common) add_test(test_conn_check_health test_conn_check_health) endif() +add_executable(test_run tests/run.cpp) +target_compile_features(test_run PUBLIC cxx_std_17) +add_test(test_run test_run) +if (MSVC) + target_compile_options(test_run PRIVATE /bigobj) + target_compile_definitions(test_run PRIVATE _WIN32_WINNT=0x0601) +endif() + # Install #======================================================================= diff --git a/README.md b/README.md index 50324d55..ae682a4c 100644 --- a/README.md +++ b/README.md @@ -831,6 +831,9 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. * Adds a function that performs health checks, see `boost::redis::experimental::async_check_health`. +* Adds non-member `async_run` function that resolves, connects and + calls member `async_run` on a connection object. + ### v1.4.0-1 * Renames `retry_on_connection_lost` to `cancel_if_unresponded`. (v1.4.1) diff --git a/examples/common/boost_redis.cpp b/examples/boost_redis.cpp similarity index 100% rename from examples/common/boost_redis.cpp rename to examples/boost_redis.cpp diff --git a/examples/common/common.cpp b/examples/common/common.cpp deleted file mode 100644 index 8b6725c7..00000000 --- a/examples/common/common.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#include "common.hpp" - -#include -#if defined(BOOST_ASIO_HAS_CO_AWAIT) -#include -#include - -namespace net = boost::asio; -using namespace net::experimental::awaitable_operators; -using resolver = net::use_awaitable_t<>::as_default_on_t; -using timer_type = net::use_awaitable_t<>::as_default_on_t; -using boost::redis::request; -using boost::redis::operation; - -namespace -{ -auto redir(boost::system::error_code& ec) - { return net::redirect_error(net::use_awaitable, ec); } -} - -auto -connect( - std::shared_ptr conn, - std::string const& host, - std::string const& port) -> net::awaitable -{ - auto ex = co_await net::this_coro::executor; - resolver resv{ex}; - timer_type timer{ex}; - - boost::system::error_code ec; - timer.expires_after(std::chrono::seconds{5}); - auto const addrs = co_await (resv.async_resolve(host, port) || timer.async_wait(redir(ec))); - if (!ec) - throw std::runtime_error("Resolve timeout"); - - timer.expires_after(std::chrono::seconds{5}); - co_await (net::async_connect(conn->next_layer(), std::get<0>(addrs)) || timer.async_wait(redir(ec))); - if (!ec) - throw std::runtime_error("Connect timeout"); -} - -auto run(net::awaitable op) -> int -{ - try { - net::io_context ioc; - net::co_spawn(ioc, std::move(op), [](std::exception_ptr p) { - if (p) - std::rethrow_exception(p); - }); - ioc.run(); - - return 0; - - } catch (std::exception const& e) { - std::cerr << "Error: " << e.what() << std::endl; - } - - return 1; -} - -#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/examples/common/common.hpp b/examples/common/common.hpp deleted file mode 100644 index dc5a462c..00000000 --- a/examples/common/common.hpp +++ /dev/null @@ -1,33 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#ifndef BOOST_REDIS_EXAMPLES_COMMON_HPP -#define BOOST_REDIS_EXAMPLES_COMMON_HPP - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#if defined(BOOST_ASIO_HAS_CO_AWAIT) - -using connection = boost::asio::use_awaitable_t<>::as_default_on_t; - -auto -connect( - std::shared_ptr conn, - std::string const& host, - std::string const& port) -> boost::asio::awaitable; - -auto run(boost::asio::awaitable op) -> int; - -#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) -#endif // BOOST_REDIS_EXAMPLES_COMMON_HPP diff --git a/examples/cpp17_intro.cpp b/examples/cpp17_intro.cpp index e1d34f18..d53ee587 100644 --- a/examples/cpp17_intro.cpp +++ b/examples/cpp17_intro.cpp @@ -10,16 +10,12 @@ #include namespace net = boost::asio; -namespace redis = boost::redis; -using redis::operation; -using redis::request; -using redis::response; -using redis::ignore_t; - -void log(boost::system::error_code const& ec, char const* prefix) -{ - std::clog << prefix << ec.message() << std::endl; -} +using boost::redis::connection; +using boost::redis::request; +using boost::redis::response; +using boost::redis::ignore_t; +using boost::redis::async_run; +using namespace std::chrono_literals; auto main(int argc, char * argv[]) -> int { @@ -36,59 +32,22 @@ auto main(int argc, char * argv[]) -> int request req; req.push("HELLO", 3); req.push("PING", "Hello world"); - req.push("QUIT"); // The response. - response resp; + response resp; net::io_context ioc; + connection conn{ioc}; - // IO objects. - net::ip::tcp::resolver resv{ioc}; - redis::connection conn{ioc}; + async_run(conn, host, port, 10s, 10s, [&](auto){ + conn.cancel(); + }); - // Resolve endpoints. - net::ip::tcp::resolver::results_type endpoints; - - // async_run callback. - auto on_run = [](auto ec) - { - if (ec) - return log(ec, "on_run: "); - }; - - // async_exec callback. - auto on_exec = [&](auto ec, auto) - { - if (ec) { - conn.cancel(operation::run); - return log(ec, "on_exec: "); - } - - std::cout << "PING: " << std::get<1>(resp).value() << std::endl; - }; - - // Connect callback. - auto on_connect = [&](auto ec, auto) - { - if (ec) - return log(ec, "on_connect: "); - - conn.async_run(on_run); - conn.async_exec(req, resp, on_exec); - }; - - // Resolve callback. - auto on_resolve = [&](auto ec, auto const& addrs) - { - if (ec) - return log(ec, "on_resolve: "); - - endpoints = addrs; - net::async_connect(conn.next_layer(), endpoints, on_connect); - }; - - resv.async_resolve(host, port, on_resolve); + conn.async_exec(req, resp, [&](auto ec, auto){ + if (!ec) + std::cout << "PING: " << std::get<1>(resp).value() << std::endl; + conn.cancel(); + }); ioc.run(); return 0; @@ -96,7 +55,6 @@ auto main(int argc, char * argv[]) -> int } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; } - return 1; } diff --git a/examples/cpp17_intro_sync.cpp b/examples/cpp17_intro_sync.cpp index e6ba8b96..5621b665 100644 --- a/examples/cpp17_intro_sync.cpp +++ b/examples/cpp17_intro_sync.cpp @@ -6,19 +6,25 @@ #include #include +#include #include #include #include #include +#include // Include this in no more than one .cpp file. #include namespace net = boost::asio; using connection = boost::redis::connection; +using boost::redis::operation; using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; +using boost::redis::async_run; +using boost::redis::async_check_health; +using namespace std::chrono_literals; template auto exec(std::shared_ptr conn, request const& req, Response& resp) @@ -29,9 +35,6 @@ auto exec(std::shared_ptr conn, request const& req, Response& resp) (net::use_future).get(); } -auto logger = [](auto const& ec) - { std::clog << "Run: " << ec.message() << std::endl; }; - auto main(int argc, char * argv[]) -> int { try { @@ -47,17 +50,17 @@ auto main(int argc, char * argv[]) -> int auto conn = std::make_shared(ioc); - // Resolves the address - net::ip::tcp::resolver resv{ioc}; - auto const res = resv.resolve(host, port); + // Starts a thread that will can io_context::run on which the + // connection will run. + std::thread t{[&ioc, conn, host, port]() { + async_run(*conn, host, port, 10s, 10s, [conn](auto){ + conn->cancel(); + }); - // Connect to Redis - net::connect(conn->next_layer(), res); + async_check_health(*conn, "Boost.Redis", 2s, [conn](auto) { + conn->cancel(); + }); - // Starts a thread that will can io_context::run on which - // the connection will run. - std::thread t{[conn, &ioc]() { - conn->async_run(logger); ioc.run(); }}; diff --git a/examples/cpp20_chat_room.cpp b/examples/cpp20_chat_room.cpp index 744fa70a..f02bb541 100644 --- a/examples/cpp20_chat_room.cpp +++ b/examples/cpp20_chat_room.cpp @@ -11,17 +11,17 @@ namespace net = boost::asio; #if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR) #include #include -#include +#include #include -#include "common/common.hpp" - using namespace net::experimental::awaitable_operators; using stream_descriptor = net::use_awaitable_t<>::as_default_on_t; using signal_set = net::use_awaitable_t<>::as_default_on_t; using boost::redis::request; using boost::redis::generic_response; -using boost::redis::experimental::async_check_health; +using boost::redis::async_check_health; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; // Chat over Redis pubsub. To test, run this program from multiple // terminals and type messages to stdin. @@ -60,8 +60,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable req.push("HELLO", 3); req.push("SUBSCRIBE", "chat-channel"); - co_await connect(conn, host, port); - co_await ((conn->async_run() || publisher(stream, conn) || receiver(conn) || + co_await ((async_run(*conn, host, port) || publisher(stream, conn) || receiver(conn) || async_check_health(*conn) || sig.async_wait()) && conn->async_exec(req)); } diff --git a/examples/cpp20_containers.cpp b/examples/cpp20_containers.cpp index e29fc900..f000a6ef 100644 --- a/examples/cpp20_containers.cpp +++ b/examples/cpp20_containers.cpp @@ -6,19 +6,18 @@ #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) -#include #include #include #include - -#include "common/common.hpp" +#include namespace net = boost::asio; namespace redis = boost::redis; -using namespace net::experimental::awaitable_operators; using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; void print(std::map const& cont) { @@ -34,8 +33,7 @@ void print(std::vector const& cont) auto run(std::shared_ptr conn, std::string host, std::string port) -> net::awaitable { - co_await connect(conn, host, port); - co_await conn->async_run(); + co_await async_run(*conn, host, port); } // Stores the content of some STL containers in Redis. diff --git a/examples/cpp20_echo_server.cpp b/examples/cpp20_echo_server.cpp index 292f7098..0be953c4 100644 --- a/examples/cpp20_echo_server.cpp +++ b/examples/cpp20_echo_server.cpp @@ -8,8 +8,7 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include -#include "common/common.hpp" +#include namespace net = boost::asio; using namespace net::experimental::awaitable_operators; @@ -18,7 +17,9 @@ using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t::as_default_on_t; using boost::redis::request; using boost::redis::response; -using boost::redis::experimental::async_check_health; +using boost::redis::async_check_health; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> net::awaitable { @@ -55,8 +56,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable request req; req.push("HELLO", 3); - co_await connect(conn, host, port); - co_await ((conn->async_run() || listener(conn) || async_check_health(*conn) || + co_await ((async_run(*conn, host, port) || listener(conn) || async_check_health(*conn) || sig.async_wait()) && conn->async_exec(req)); } diff --git a/examples/cpp20_intro.cpp b/examples/cpp20_intro.cpp index 1dc92079..fc75a724 100644 --- a/examples/cpp20_intro.cpp +++ b/examples/cpp20_intro.cpp @@ -7,25 +7,22 @@ #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include -#include "common/common.hpp" +#include namespace net = boost::asio; -using boost::redis::operation; using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; auto run(std::shared_ptr conn, std::string host, std::string port) -> net::awaitable { - // From examples/common.hpp to avoid vebosity - co_await connect(conn, host, port); - // async_run coordinate read and write operations. - co_await conn->async_run(); + co_await async_run(*conn, host, port); // Cancel pending operations, if any. - conn->cancel(operation::exec); - conn->cancel(operation::receive); + conn->cancel(); } // Called from the main function (see main.cpp) diff --git a/examples/cpp20_intro_awaitable_ops.cpp b/examples/cpp20_intro_awaitable_ops.cpp index 7759fbc2..2c303e48 100644 --- a/examples/cpp20_intro_awaitable_ops.cpp +++ b/examples/cpp20_intro_awaitable_ops.cpp @@ -4,17 +4,19 @@ * accompanying file LICENSE.txt) */ +#include #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include "common/common.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; // Called from the main function (see main.cpp) auto co_main(std::string host, std::string port) -> net::awaitable @@ -27,9 +29,8 @@ auto co_main(std::string host, std::string port) -> net::awaitable response resp; - auto conn = std::make_shared(co_await net::this_coro::executor); - co_await connect(conn, host, port); - co_await (conn->async_run() || conn->async_exec(req, resp)); + connection conn{co_await net::this_coro::executor}; + co_await (async_run(conn, host, port) || conn.async_exec(req, resp)); std::cout << "PING: " << std::get<1>(resp).value() << std::endl; } catch (std::exception const& e) { diff --git a/examples/cpp20_json.cpp b/examples/cpp20_json.cpp index 968889c4..8a922259 100644 --- a/examples/cpp20_json.cpp +++ b/examples/cpp20_json.cpp @@ -12,7 +12,6 @@ #include #include #include -#include "common/common.hpp" #include "json.hpp" // Include this in no more than one .cpp file. @@ -25,6 +24,8 @@ using boost::redis::request; using boost::redis::response; using boost::redis::operation; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; // Struct that will be stored in Redis using json serialization. struct user { @@ -45,8 +46,7 @@ void boost_redis_from_bulk(user& u, std::string_view sv, boost::system::error_co auto run(std::shared_ptr conn, std::string host, std::string port) -> net::awaitable { - co_await connect(conn, host, port); - co_await conn->async_run(); + co_await async_run(*conn, host, port); } net::awaitable co_main(std::string host, std::string port) diff --git a/examples/cpp20_protobuf.cpp b/examples/cpp20_protobuf.cpp index cb807370..9bc89264 100644 --- a/examples/cpp20_protobuf.cpp +++ b/examples/cpp20_protobuf.cpp @@ -8,7 +8,6 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include "common/common.hpp" #include "protobuf.hpp" // See the definition in person.proto. This header is automatically @@ -21,6 +20,8 @@ using boost::redis::request; using boost::redis::response; using boost::redis::operation; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; // The protobuf type described in examples/person.proto using tutorial::person; @@ -42,8 +43,7 @@ using tutorial::boost_redis_from_bulk; auto run(std::shared_ptr conn, std::string host, std::string port) -> net::awaitable { - co_await connect(conn, host, port); - co_await conn->async_run(); + co_await async_run(*conn, host, port); } net::awaitable co_main(std::string host, std::string port) diff --git a/examples/cpp20_resolve_with_sentinel.cpp b/examples/cpp20_resolve_with_sentinel.cpp index d461513d..71cc1d87 100644 --- a/examples/cpp20_resolve_with_sentinel.cpp +++ b/examples/cpp20_resolve_with_sentinel.cpp @@ -4,19 +4,20 @@ * accompanying file LICENSE.txt) */ +#include #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include "common/common.hpp" - namespace net = boost::asio; using namespace net::experimental::awaitable_operators; using endpoints = net::ip::tcp::resolver::results_type; using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; auto redir(boost::system::error_code& ec) { return net::redirect_error(net::use_awaitable, ec); } @@ -40,8 +41,7 @@ auto resolve_master_address(std::vector
const& endpoints) -> net::await response>, ignore_t> addr; for (auto ep : endpoints) { boost::system::error_code ec; - co_await connect(conn, ep.host, ep.port); - co_await (conn->async_run() && conn->async_exec(req, addr, redir(ec))); + co_await (async_run(*conn, ep.host, ep.port) && conn->async_exec(req, addr, redir(ec))); conn->reset_stream(); if (std::get<0>(addr)) co_return address{std::get<0>(addr).value().value().at(0), std::get<0>(addr).value().value().at(1)}; diff --git a/examples/cpp20_subscriber.cpp b/examples/cpp20_subscriber.cpp index 44f9c953..b34c5e8f 100644 --- a/examples/cpp20_subscriber.cpp +++ b/examples/cpp20_subscriber.cpp @@ -4,20 +4,21 @@ * accompanying file LICENSE.txt) */ +#include #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include #include -#include - -#include "common/common.hpp" +#include namespace net = boost::asio; using namespace net::experimental::awaitable_operators; using steady_timer = net::use_awaitable_t<>::as_default_on_t; using boost::redis::request; +using boost::redis::async_run; using boost::redis::generic_response; -using boost::redis::experimental::async_check_health; +using boost::redis::async_check_health; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; /* This example will subscribe and read pushes indefinitely. * @@ -57,9 +58,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable // The loop will reconnect on connection lost. To exit type Ctrl-C twice. for (;;) { - co_await connect(conn, host, port); - co_await ((conn->async_run() || async_check_health(*conn) || receiver(conn)) && conn->async_exec(req)); - + co_await ((async_run(*conn, host, port) || async_check_health(*conn) || receiver(conn)) && conn->async_exec(req)); conn->reset_stream(); timer.expires_after(std::chrono::seconds{1}); co_await timer.async_wait(); diff --git a/examples/common/main.cpp b/examples/main.cpp similarity index 92% rename from examples/common/main.cpp rename to examples/main.cpp index 33b56d78..f86a0949 100644 --- a/examples/common/main.cpp +++ b/examples/main.cpp @@ -8,7 +8,7 @@ #if defined(BOOST_ASIO_HAS_CO_AWAIT) -#include "common.hpp" +#include "start.hpp" extern boost::asio::awaitable co_main(std::string, std::string); @@ -22,7 +22,7 @@ auto main(int argc, char * argv[]) -> int port = argv[2]; } - return run(co_main(host, port)); + return start(co_main(host, port)); } #else // defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/examples/start.cpp b/examples/start.cpp new file mode 100644 index 00000000..7cceb56b --- /dev/null +++ b/examples/start.cpp @@ -0,0 +1,33 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include "start.hpp" +#include +#if defined(BOOST_ASIO_HAS_CO_AWAIT) +#include + +namespace net = boost::asio; + +auto start(net::awaitable op) -> int +{ + try { + net::io_context ioc; + net::co_spawn(ioc, std::move(op), [](std::exception_ptr p) { + if (p) + std::rethrow_exception(p); + }); + ioc.run(); + + return 0; + + } catch (std::exception const& e) { + std::cerr << "Error: " << e.what() << std::endl; + } + + return 1; +} + +#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) diff --git a/examples/start.hpp b/examples/start.hpp new file mode 100644 index 00000000..6388686d --- /dev/null +++ b/examples/start.hpp @@ -0,0 +1,17 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_EXAMPLES_START_HPP +#define BOOST_REDIS_EXAMPLES_START_HPP + +#include + +#if defined(BOOST_ASIO_HAS_CO_AWAIT) + +auto start(boost::asio::awaitable op) -> int; + +#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) +#endif // BOOST_REDIS_EXAMPLES_START_HPP diff --git a/include/boost/redis.hpp b/include/boost/redis.hpp index 0972fe0c..9eac385d 100644 --- a/include/boost/redis.hpp +++ b/include/boost/redis.hpp @@ -11,6 +11,7 @@ #include #include #include +#include /** @defgroup high-level-api Reference * diff --git a/include/boost/redis/experimental/run.hpp b/include/boost/redis/check_health.hpp similarity index 85% rename from include/boost/redis/experimental/run.hpp rename to include/boost/redis/check_health.hpp index c71b47cb..a18960f0 100644 --- a/include/boost/redis/experimental/run.hpp +++ b/include/boost/redis/check_health.hpp @@ -4,17 +4,22 @@ * accompanying file LICENSE.txt) */ -#ifndef BOOST_REDIS_RUN_HPP -#define BOOST_REDIS_RUN_HPP +#ifndef BOOST_REDIS_CHECK_HEALTH_HPP +#define BOOST_REDIS_CHECK_HEALTH_HPP // Has to included before promise.hpp to build on msvc. +#include +#include +#include +#include #include #include #include #include #include +#include -namespace boost::redis::experimental { +namespace boost::redis { namespace detail { template @@ -23,14 +28,17 @@ private: using executor_type = typename Connection::executor_type; struct state { - using clock_type = std::chrono::steady_clock; - using clock_traits_type = asio::wait_traits; - using timer_type = asio::basic_waitable_timer; using promise_type = asio::experimental::promise; + using timer_type = + asio::basic_waitable_timer< + std::chrono::steady_clock, + asio::wait_traits, + executor_type>; + timer_type timer_; - request req_; - generic_response resp_; + redis::request req_; + redis::generic_response resp_; std::optional prom_; std::chrono::steady_clock::duration interval_; @@ -124,6 +132,6 @@ async_check_health( >(detail::check_health_op{conn, msg, interval}, token, conn); } -} // boost::redis::experimental +} // boost::redis -#endif // BOOST_REDIS_RUN_HPP +#endif // BOOST_REDIS_CHECK_HEALTH_HPP diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 8c03ab80..84dd22d5 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -201,15 +201,15 @@ public: * @li `operation::exec`: Cancels operations started with * `async_exec`. Affects only requests that haven't been written * yet. - * @li operation::run: Cancels the `async_run` operation. Notice - * that the preferred way to close a connection is to send a - * [QUIT](https://redis.io/commands/quit/) command to the server. + * @li operation::run: Cancels the `async_run` operation. * @li operation::receive: Cancels any ongoing calls to * `async_receive`. + * @li operation::all: Cancels all operations listed above. This + * is the default argument. * * @param op: The operation to be cancelled. * @returns The number of operations that have been canceled. */ - auto cancel(operation op) -> std::size_t + auto cancel(operation op = operation::all) -> std::size_t { return base_type::cancel(op); } /// Sets the maximum size of the read buffer. diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index 4e63f72a..b478d13c 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -52,7 +52,7 @@ public: auto get_executor() {return writer_timer_.get_executor();} - auto cancel(operation op) -> std::size_t + auto cancel_impl(operation op) -> std::size_t { switch (op) { case operation::exec: @@ -77,6 +77,19 @@ public: } } + auto cancel(operation op) -> std::size_t + { + if (op == operation::all) { + std::size_t ret = 0; + ret += cancel_impl(operation::run); + ret += cancel_impl(operation::receive); + ret += cancel_impl(operation::exec); + return ret; + } + + return cancel_impl(op); + } + auto cancel_unwritten_requests() -> std::size_t { auto f = [](auto const& ptr) diff --git a/include/boost/redis/detail/runner.hpp b/include/boost/redis/detail/runner.hpp new file mode 100644 index 00000000..33bfae9d --- /dev/null +++ b/include/boost/redis/detail/runner.hpp @@ -0,0 +1,225 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_RUNNER_HPP +#define BOOST_REDIS_RUNNER_HPP + +// Has to included before promise.hpp to build on msvc. +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost::redis::detail { + +template +struct resolve_op { + Runner* runner = nullptr; + std::string_view host; + std::string_view port; + asio::coroutine coro{}; + + template + void operator()( Self& self + , std::array order = {} + , system::error_code ec1 = {} + , asio::ip::tcp::resolver::results_type res = {} + , system::error_code ec2 = {}) + { + BOOST_ASIO_CORO_REENTER (coro) + { + BOOST_ASIO_CORO_YIELD + asio::experimental::make_parallel_group( + [this](auto token) { return runner->resv_.async_resolve(host.data(), port.data(), token);}, + [this](auto token) { return runner->timer_.async_wait(token);} + ).async_wait( + asio::experimental::wait_for_one(), + std::move(self)); + + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); + return; + } + + switch (order[0]) { + case 0: { + // Resolver completed first. + runner->endpoints_ = res; + self.complete(ec1); + } break; + + case 1: { + if (ec2) { + // Timer completed first with error, perhaps a + // cancellation going on. + self.complete(ec2); + } else { + // Timer completed first without an error, this is a + // resolve timeout. + self.complete(error::resolve_timeout); + } + } break; + + default: BOOST_ASSERT(false); + } + } + } +}; + +template +struct connect_op { + Runner* runner = nullptr; + Stream* stream = nullptr; + asio::coroutine coro{}; + + template + void operator()( Self& self + , std::array order = {} + , system::error_code ec1 = {} + , asio::ip::tcp::endpoint const& = {} + , system::error_code ec2 = {}) + { + BOOST_ASIO_CORO_REENTER (coro) + { + BOOST_ASIO_CORO_YIELD + asio::experimental::make_parallel_group( + [this](auto token) + { + auto f = [](system::error_code const&, auto const&) { return true; }; + return asio::async_connect(*stream, runner->endpoints_, f, token); + }, + [this](auto token) { return runner->timer_.async_wait(token);} + ).async_wait( + asio::experimental::wait_for_one(), + std::move(self)); + + if (is_cancelled(self)) { + self.complete(asio::error::operation_aborted); + return; + } + + switch (order[0]) { + case 0: { + self.complete(ec1); + } break; + case 1: + { + if (ec2) { + self.complete(ec2); + } else { + self.complete(error::connect_timeout); + } + } break; + + default: BOOST_ASSERT(false); + } + } + } +}; + +template +struct runner_op { + Runner* runner = nullptr; + Connection* conn = nullptr; + std::string_view host; + std::string_view port; + std::chrono::steady_clock::duration resolve_timeout; + std::chrono::steady_clock::duration connect_timeout; + asio::coroutine coro{}; + + template + void operator()(Self& self, system::error_code ec = {}) + { + BOOST_ASIO_CORO_REENTER (coro) + { + runner->timer_.expires_after(resolve_timeout); + BOOST_ASIO_CORO_YIELD + runner->async_resolve(host, port, std::move(self)); + AEDIS_CHECK_OP0(;) + + runner->timer_.expires_after(connect_timeout); + BOOST_ASIO_CORO_YIELD + runner->async_connect(conn->next_layer(), std::move(self)); + AEDIS_CHECK_OP0(;) + + BOOST_ASIO_CORO_YIELD + conn->async_run(std::move(self)); + AEDIS_CHECK_OP0(;) + self.complete({}); + } + } +}; + +template +class runner { +public: + runner(Executor ex): resv_{ex}, timer_{ex} {} + + template + auto + async_resolve( + std::string_view host, + std::string_view port, + CompletionToken&& token) + { + return asio::async_compose + < CompletionToken + , void(system::error_code) + >(resolve_op{this, host, port}, token, resv_); + } + + template + auto async_connect(Stream& stream, CompletionToken&& token) + { + return asio::async_compose + < CompletionToken + , void(system::error_code) + >(connect_op{this, &stream}, token, resv_); + } + + template + auto + async_run( + Connection& conn, + std::string_view host, + std::string_view port, + std::chrono::steady_clock::duration resolve_timeout, + std::chrono::steady_clock::duration connect_timeout, + CompletionToken&& token) + { + return asio::async_compose + < CompletionToken + , void(system::error_code) + >(runner_op{this, &conn, host, port, resolve_timeout, connect_timeout}, token, resv_); + } + +private: + using resolver_type = asio::ip::basic_resolver; + using timer_type = + asio::basic_waitable_timer< + std::chrono::steady_clock, + asio::wait_traits, + Executor>; + + + template friend struct runner_op; + template friend struct connect_op; + template friend struct resolve_op; + + resolver_type resv_; + timer_type timer_; + asio::ip::tcp::resolver::results_type endpoints_; +}; + +} // boost::redis::detail + +#endif // BOOST_REDIS_RUNNER_HPP diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index b154dc56..f61325af 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -63,6 +63,12 @@ enum class error /// There is no stablished connection. not_connected, + + /// Resolve timeout + resolve_timeout, + + /// Connect timeout + connect_timeout, }; /** \internal diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index 42e72e76..502bd9ec 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -38,6 +38,8 @@ struct error_category_impl : system::error_category { case error::not_a_double: return "Not a double."; case error::resp3_null: return "Got RESP3 null."; case error::not_connected: return "Not connected."; + case error::resolve_timeout: return "Resolve timeout."; + case error::connect_timeout: return "Connect timeout."; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/operation.hpp b/include/boost/redis/operation.hpp index 99d4f2a9..58f3c6de 100644 --- a/include/boost/redis/operation.hpp +++ b/include/boost/redis/operation.hpp @@ -22,6 +22,8 @@ enum class operation { run, /// Refers to `connection::async_receive` operations. receive, + /// Refers to all operations. + all, }; } // boost::redis diff --git a/include/boost/redis/run.hpp b/include/boost/redis/run.hpp new file mode 100644 index 00000000..1ca54fe1 --- /dev/null +++ b/include/boost/redis/run.hpp @@ -0,0 +1,63 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_RUN_HPP +#define BOOST_REDIS_RUN_HPP + +// Has to included before promise.hpp to build on msvc. +#include +#include +#include +#include +#include +#include + +namespace boost::redis { + +/** @brief Call async_run on the connection. + * @ingroup high-level-api + * + * This is a facility function that + * 1. Resoves the endpoint. + * 2. Connects to one of the endpoints from 1. + * 3. Calls async_run on the underlying connection. + * + * @param host Redis host to connect to. + * @param port Redis port to connect to. + * @param resolve_timeout Time the resolve operation is allowed to take. + * @param connect_timeout Time the connect operation is allowed to take. + * @param token Completion token. + */ +template < + class Socket, + class CompletionToken = asio::default_completion_token_t +> +auto +async_run( + basic_connection& conn, + std::string_view host = "127.0.0.1", + std::string_view port = "6379", + std::chrono::steady_clock::duration resolve_timeout = std::chrono::seconds{10}, + std::chrono::steady_clock::duration connect_timeout = std::chrono::seconds{10}, + CompletionToken token = CompletionToken{}) +{ + using executor_type = typename Socket::executor_type; + using runner_type = detail::runner; + auto runner = std::make_shared(conn.get_executor()); + + return + runner->async_run( + conn, + host, + port, + resolve_timeout, + connect_timeout, + asio::consign(std::move(token), runner)); +} + +} // boost::redis + +#endif // BOOST_REDIS_RUN_HPP diff --git a/include/boost/redis/ssl/connection.hpp b/include/boost/redis/ssl/connection.hpp index 7b249d67..b9e983f2 100644 --- a/include/boost/redis/ssl/connection.hpp +++ b/include/boost/redis/ssl/connection.hpp @@ -122,7 +122,7 @@ public: * * See redis::connection::cancel for more information. */ - auto cancel(operation op) -> std::size_t + auto cancel(operation op = operation::all) -> std::size_t { return base_type::cancel(op); } auto& lowest_layer() noexcept { return stream_.lowest_layer(); } diff --git a/tests/common.hpp b/tests/common.hpp index a6eb4ba2..362cc9c3 100644 --- a/tests/common.hpp +++ b/tests/common.hpp @@ -1,22 +1,9 @@ #pragma once #include -#include - -namespace net = boost::asio; -using endpoints = net::ip::tcp::resolver::results_type; - -auto -resolve( - std::string const& host = "127.0.0.1", - std::string const& port = "6379") -> endpoints -{ - net::io_context ioc; - net::ip::tcp::resolver resv{ioc}; - return resv.resolve(host, port); -} #ifdef BOOST_ASIO_HAS_CO_AWAIT +namespace net = boost::asio; inline auto redir(boost::system::error_code& ec) { return net::redirect_error(net::use_awaitable, ec); } diff --git a/tests/conn_check_health.cpp b/tests/conn_check_health.cpp index 5b7e2962..096a7c52 100644 --- a/tests/conn_check_health.cpp +++ b/tests/conn_check_health.cpp @@ -12,7 +12,7 @@ #include #include -#include +#include #include #include "common.hpp" @@ -24,7 +24,9 @@ using boost::redis::request; using boost::redis::ignore; using boost::redis::operation; using boost::redis::generic_response; -using boost::redis::experimental::async_check_health; +using boost::redis::async_check_health; +using boost::redis::async_run; +using namespace std::chrono_literals; std::chrono::seconds const interval{1}; @@ -74,14 +76,11 @@ BOOST_AUTO_TEST_CASE(check_health) { net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); // It looks like client pause does not work for clients that are // sending MONITOR. I will therefore open a second connection. connection conn2{ioc}; - net::connect(conn2.next_layer(), endpoints); std::string const msg = "test-check-health"; @@ -108,12 +107,12 @@ BOOST_AUTO_TEST_CASE(check_health) generic_response resp; push_callback{&conn, &conn2, &resp, &req2}(); // Starts reading pushes. - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ std::cout << "B" << std::endl; BOOST_TEST(!!ec); }); - conn2.async_run([](auto ec){ + async_run(conn2, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ std::cout << "C" << std::endl; BOOST_TEST(!!ec); }); diff --git a/tests/conn_echo_stress.cpp b/tests/conn_echo_stress.cpp index f6ef4156..ffbd3b5d 100644 --- a/tests/conn_echo_stress.cpp +++ b/tests/conn_echo_stress.cpp @@ -13,7 +13,7 @@ #include #include #include "common.hpp" -#include "../examples/common/common.hpp" +#include "../examples/start.hpp" namespace net = boost::asio; using error_code = boost::system::error_code; @@ -22,6 +22,9 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; +using namespace std::chrono_literals; auto push_consumer(std::shared_ptr conn, int expected) -> net::awaitable { @@ -74,13 +77,12 @@ auto async_echo_stress() -> net::awaitable for (int i = 0; i < sessions; ++i) net::co_spawn(ex, echo_session(conn, std::to_string(i), msgs), net::detached); - co_await connect(conn, "127.0.0.1", "6379"); - co_await conn->async_run(); + co_await async_run(*conn); } BOOST_AUTO_TEST_CASE(echo_stress) { - run(async_echo_stress()); + start(async_echo_stress()); } #else diff --git a/tests/conn_exec.cpp b/tests/conn_exec.cpp index 71ccf97c..37606271 100644 --- a/tests/conn_exec.cpp +++ b/tests/conn_exec.cpp @@ -28,6 +28,8 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore; using boost::redis::ignore_t; +using boost::redis::async_run; +using namespace std::chrono_literals; BOOST_AUTO_TEST_CASE(hello_priority) { @@ -47,9 +49,7 @@ BOOST_AUTO_TEST_CASE(hello_priority) net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); bool seen1 = false; bool seen2 = false; @@ -77,7 +77,7 @@ BOOST_AUTO_TEST_CASE(hello_priority) seen3 = true; }); - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_TEST(!ec); }); @@ -94,14 +94,12 @@ BOOST_AUTO_TEST_CASE(wrong_response_data_type) response resp; net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); conn.async_exec(req, resp, [](auto ec, auto){ BOOST_CHECK_EQUAL(ec, boost::redis::error::not_a_number); }); - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted); }); diff --git a/tests/conn_exec_cancel.cpp b/tests/conn_exec_cancel.cpp index 228d9659..ee195f5d 100644 --- a/tests/conn_exec_cancel.cpp +++ b/tests/conn_exec_cancel.cpp @@ -14,7 +14,7 @@ #include #include #include "common.hpp" -#include "../examples/common/common.hpp" +#include "../examples/start.hpp" // NOTE1: Sends hello separately. I have observed that if hello and // blpop are sent toguether, Redis will send the response of hello @@ -30,6 +30,9 @@ using boost::redis::response; using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::ignore_t; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; +using namespace std::chrono_literals; auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable { @@ -37,9 +40,8 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable generic_response gresp; auto conn = std::make_shared(ex); - co_await connect(conn, "127.0.0.1", "6379"); - conn->async_run([conn](auto ec) { + async_run(*conn, "127.0.0.1", "6379", 10s, 10s, [conn](auto ec) { std::cout << "async_run: " << ec.message() << std::endl; BOOST_TEST(!ec); }); @@ -85,11 +87,10 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable { auto ex = co_await net::this_coro::executor; auto conn = std::make_shared(ex); - co_await connect(conn, "127.0.0.1", "6379"); // Calls async_run separately from the group of ops below to avoid // having it canceled when the timer fires. - conn->async_run([conn](auto ec) { + async_run(*conn, "127.0.0.1", "6379", 10s, 10s, [conn](auto ec) { BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted); }); @@ -118,20 +119,18 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable BOOST_AUTO_TEST_CASE(test_ignore_explicit_cancel_of_req_written) { - run(async_ignore_explicit_cancel_of_req_written()); + start(async_ignore_explicit_cancel_of_req_written()); } BOOST_AUTO_TEST_CASE(test_ignore_implicit_cancel_of_req_written) { - run(ignore_implicit_cancel_of_req_written()); + start(ignore_implicit_cancel_of_req_written()); } BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled) { net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); request req0; req0.push("HELLO", 3); @@ -156,7 +155,7 @@ BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled) conn.async_exec(req0, ignore, c0); - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted); }); diff --git a/tests/conn_exec_error.cpp b/tests/conn_exec_error.cpp index 000b54ac..61b0a94e 100644 --- a/tests/conn_exec_error.cpp +++ b/tests/conn_exec_error.cpp @@ -27,6 +27,8 @@ using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::error; +using boost::redis::async_run; +using namespace std::chrono_literals; BOOST_AUTO_TEST_CASE(no_ignore_error) { @@ -37,16 +39,13 @@ BOOST_AUTO_TEST_CASE(no_ignore_error) net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); conn.async_exec(req, ignore, [&](auto ec, auto){ BOOST_CHECK_EQUAL(ec, error::resp3_simple_error); conn.cancel(redis::operation::run); }); - - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted); }); @@ -66,9 +65,7 @@ BOOST_AUTO_TEST_CASE(has_diagnostic) net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); response resp; conn.async_exec(req, resp, [&](auto ec, auto){ @@ -87,8 +84,7 @@ BOOST_AUTO_TEST_CASE(has_diagnostic) conn.cancel(redis::operation::run); }); - - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted); }); @@ -111,9 +107,7 @@ BOOST_AUTO_TEST_CASE(resp3_error_in_cmd_pipeline) response resp2; net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); auto c2 = [&](auto ec, auto) { @@ -139,8 +133,7 @@ BOOST_AUTO_TEST_CASE(resp3_error_in_cmd_pipeline) }; conn.async_exec(req1, resp1, c1); - - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted); }); @@ -171,9 +164,7 @@ BOOST_AUTO_TEST_CASE(error_in_transaction) net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); conn.async_exec(req, resp, [&](auto ec, auto){ BOOST_TEST(!ec); @@ -205,8 +196,7 @@ BOOST_AUTO_TEST_CASE(error_in_transaction) conn.cancel(redis::operation::run); }); - - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted); }); @@ -226,9 +216,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) req2.push("SUBSCRIBE"); // Wrong command synthax. net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); auto c2 = [&](auto ec, auto) { @@ -258,8 +246,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) }; conn.async_receive(gresp, c3); - - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ std::cout << "async_run" << std::endl; BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted); }); diff --git a/tests/conn_exec_retry.cpp b/tests/conn_exec_retry.cpp index c47808a0..7371a995 100644 --- a/tests/conn_exec_retry.cpp +++ b/tests/conn_exec_retry.cpp @@ -23,6 +23,8 @@ using boost::redis::operation; using boost::redis::request; using boost::redis::response; using boost::redis::ignore; +using boost::redis::async_run; +using namespace std::chrono_literals; BOOST_AUTO_TEST_CASE(request_retry_false) { @@ -53,9 +55,6 @@ BOOST_AUTO_TEST_CASE(request_retry_false) conn.cancel(operation::run); }); - auto const endpoints = resolve(); - net::connect(conn.next_layer(), endpoints); - auto c2 = [&](auto ec, auto){ BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); }; @@ -72,7 +71,7 @@ BOOST_AUTO_TEST_CASE(request_retry_false) conn.async_exec(req0, ignore, c0); - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); }); @@ -112,9 +111,6 @@ BOOST_AUTO_TEST_CASE(request_retry_true) conn.cancel(boost::redis::operation::run); }); - auto const endpoints = resolve(); - net::connect(conn.next_layer(), endpoints); - auto c3 = [&](auto ec, auto){ BOOST_TEST(!ec); }; @@ -136,14 +132,13 @@ BOOST_AUTO_TEST_CASE(request_retry_true) conn.async_exec(req0, ignore, c0); - conn.async_run([&](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ // The first cacellation. BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); conn.reset_stream(); // Reconnects and runs again to test req3. - net::connect(conn.next_layer(), endpoints); - conn.async_run([&](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ std::cout << ec.message() << std::endl; BOOST_TEST(!ec); }); diff --git a/tests/conn_push.cpp b/tests/conn_push.cpp index a097f2cb..53c969a7 100644 --- a/tests/conn_push.cpp +++ b/tests/conn_push.cpp @@ -28,13 +28,99 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore; using boost::redis::ignore_t; +using boost::redis::async_run; +using namespace std::chrono_literals; + +BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) +{ + request req1; + req1.push("HELLO", 3); + req1.push("PING", "Message1"); + + request req2; + req2.push("SUBSCRIBE", "channel"); + + request req3; + req3.push("PING", "Message2"); + req3.push("QUIT"); + + net::io_context ioc; + + connection conn{ioc}; + + auto c3 =[](auto ec, auto...) + { + BOOST_TEST(!!ec); + }; + + auto c2 =[&](auto ec, auto...) + { + BOOST_TEST(!ec); + conn.async_exec(req3, ignore, c3); + }; + + auto c1 =[&](auto ec, auto...) + { + BOOST_TEST(!ec); + conn.async_exec(req2, ignore, c2); + }; + + conn.async_exec(req1, ignore, c1); + + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ + BOOST_TEST(!ec); + conn.cancel(operation::receive); + }); + + bool push_received = false; + conn.async_receive(ignore, [&](auto ec, auto){ + std::cout << "async_receive" << std::endl; + BOOST_TEST(!ec); + conn.cancel(operation::run); + push_received = true; + }); + + ioc.run(); + + BOOST_TEST(push_received); +} + +BOOST_AUTO_TEST_CASE(push_received1) +{ + net::io_context ioc; + connection conn{ioc}; + + request req; + req.push("HELLO", 3); + req.push("SUBSCRIBE", "channel"); + + conn.async_exec(req, ignore, [](auto ec, auto){ + std::cout << "async_exec" << std::endl; + BOOST_TEST(!ec); + }); + + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ + std::cout << "async_run: " << ec.message() << std::endl; + conn.cancel(operation::receive); + }); + + bool push_received = false; + conn.async_receive(ignore, [&](auto ec, auto){ + std::cout << "async_receive" << std::endl; + BOOST_TEST(!ec); + conn.cancel(operation::run); + push_received = true; + }); + + ioc.run(); + + BOOST_TEST(push_received); +} BOOST_AUTO_TEST_CASE(push_filtered_out) { net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); request req; req.push("HELLO", 3); @@ -51,7 +137,7 @@ BOOST_AUTO_TEST_CASE(push_filtered_out) BOOST_TEST(!ec); }); - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ BOOST_TEST(!ec); }); @@ -101,9 +187,7 @@ auto boost_redis_adapt(response_error_tag&) BOOST_AUTO_TEST_CASE(test_push_adapter) { net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); request req; req.push("HELLO", 3); @@ -119,7 +203,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) BOOST_CHECK_EQUAL(ec, net::experimental::error::channel_errors::channel_cancelled); }); - conn.async_run([](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){ BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); }); @@ -131,94 +215,9 @@ BOOST_AUTO_TEST_CASE(test_push_adapter) net::awaitable push_consumer3(connection& conn) { - for (;;) + for (;;) { co_await conn.async_receive(ignore, net::use_awaitable); -} - -BOOST_AUTO_TEST_CASE(push_received1) -{ - net::io_context ioc; - auto const endpoints = resolve(); - connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); - - request req; - req.push("HELLO", 3); - req.push("SUBSCRIBE", "channel"); - req.push("QUIT"); - - conn.async_exec(req, ignore, [](auto ec, auto){ - BOOST_TEST(!ec); - }); - - conn.async_run([&](auto ec){ - BOOST_TEST(!ec); - conn.cancel(operation::receive); - }); - - bool push_received = false; - net::co_spawn( - ioc.get_executor(), - push_consumer1(conn, push_received), - net::detached); - - ioc.run(); - - BOOST_TEST(push_received); -} - -BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) -{ - request req1; - req1.push("HELLO", 3); - req1.push("PING", "Message1"); - - request req2; - req2.push("SUBSCRIBE", "channel"); - - request req3; - req3.push("PING", "Message2"); - req3.push("QUIT"); - - net::io_context ioc; - - auto const endpoints = resolve(); - connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); - - auto c3 =[](auto ec, auto...) - { - BOOST_TEST(!ec); - }; - - auto c2 =[&](auto ec, auto...) - { - BOOST_TEST(!ec); - conn.async_exec(req3, ignore, c3); - }; - - auto c1 =[&](auto ec, auto...) - { - BOOST_TEST(!ec); - conn.async_exec(req2, ignore, c2); - }; - - conn.async_exec(req1, ignore, c1); - - conn.async_run([&](auto ec) { - BOOST_TEST(!ec); - conn.cancel(operation::receive); - }); - - bool push_received = false; - net::co_spawn( - ioc.get_executor(), - push_consumer1(conn, push_received), - net::detached); - - ioc.run(); - - BOOST_TEST(push_received); + } } BOOST_AUTO_TEST_CASE(many_subscribers) @@ -240,12 +239,11 @@ BOOST_AUTO_TEST_CASE(many_subscribers) req3.push("QUIT"); net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); auto c11 =[&](auto ec, auto...) { + std::cout << "quit sent" << std::endl; BOOST_TEST(!ec); }; auto c10 =[&](auto ec, auto...) @@ -306,7 +304,7 @@ BOOST_AUTO_TEST_CASE(many_subscribers) conn.async_exec(req0, ignore, c0); - conn.async_run([&](auto ec) { + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ BOOST_TEST(!ec); conn.cancel(operation::receive); }); diff --git a/tests/conn_quit.cpp b/tests/conn_quit.cpp index 4c1d2c2b..2d1f57c7 100644 --- a/tests/conn_quit.cpp +++ b/tests/conn_quit.cpp @@ -23,6 +23,8 @@ using operation = boost::redis::operation; using boost::redis::request; using boost::redis::response; using boost::redis::ignore; +using boost::redis::async_run; +using namespace std::chrono_literals; BOOST_AUTO_TEST_CASE(test_quit1) { @@ -32,15 +34,13 @@ BOOST_AUTO_TEST_CASE(test_quit1) req.push("QUIT"); net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); conn.async_exec(req, ignore, [](auto ec, auto) { BOOST_TEST(!ec); }); - conn.async_run([](auto ec) { + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ BOOST_TEST(!ec); }); @@ -52,9 +52,7 @@ BOOST_AUTO_TEST_CASE(test_quit2) { net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); request req1; req1.get_config().cancel_on_connection_lost = false; @@ -93,7 +91,7 @@ BOOST_AUTO_TEST_CASE(test_quit2) conn.async_exec(req1, ignore, c1); - conn.async_run([&](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ BOOST_TEST(!ec); }); diff --git a/tests/conn_reconnect.cpp b/tests/conn_reconnect.cpp index f07264f0..813c2d7c 100644 --- a/tests/conn_reconnect.cpp +++ b/tests/conn_reconnect.cpp @@ -14,13 +14,16 @@ #include #include #include "common.hpp" -#include "../examples/common/common.hpp" +#include "../examples/start.hpp" namespace net = boost::asio; using error_code = boost::system::error_code; using boost::redis::request; using boost::redis::response; using boost::redis::ignore; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; +using namespace std::chrono_literals; #include using namespace boost::asio::experimental::awaitable_operators; @@ -32,16 +35,14 @@ net::awaitable test_reconnect_impl() request req; req.push("QUIT"); - auto const endpoints = resolve(); connection conn{ex}; int i = 0; for (; i < 5; ++i) { boost::system::error_code ec1, ec2; - net::connect(conn.next_layer(), endpoints); co_await ( conn.async_exec(req, ignore, net::redirect_error(net::use_awaitable, ec1)) && - conn.async_run(net::redirect_error(net::use_awaitable, ec2)) + async_run(conn, "127.0.0.1", "6379", 10s, 10s, net::redirect_error(net::use_awaitable, ec2)) ); BOOST_TEST(!ec1); @@ -77,11 +78,10 @@ auto async_test_reconnect_timeout() -> net::awaitable req1.push("HELLO", 3); req1.push("BLPOP", "any", 0); - co_await connect(conn, "127.0.0.1", "6379"); st.expires_after(std::chrono::seconds{1}); co_await ( conn->async_exec(req1, ignore, redir(ec1)) || - conn->async_run(redir(ec2)) || + async_run(*conn, "127.0.0.1", "6379", 10s, 10s, redir(ec2)) || st.async_wait(redir(ec3)) ); @@ -96,11 +96,10 @@ auto async_test_reconnect_timeout() -> net::awaitable req2.push("HELLO", 3); req2.push("QUIT"); - co_await connect(conn, "127.0.0.1", "6379"); st.expires_after(std::chrono::seconds{1}); co_await ( conn->async_exec(req1, ignore, net::redirect_error(net::use_awaitable, ec1)) || - conn->async_run(net::redirect_error(net::use_awaitable, ec2)) || + async_run(*conn, "127.0.0.1", "6379", 10s, 10s, net::redirect_error(net::use_awaitable, ec2)) || st.async_wait(net::redirect_error(net::use_awaitable, ec3)) ); std::cout << "ccc" << std::endl; @@ -111,7 +110,7 @@ auto async_test_reconnect_timeout() -> net::awaitable BOOST_AUTO_TEST_CASE(test_reconnect_and_idle) { - run(async_test_reconnect_timeout()); + start(async_test_reconnect_timeout()); } #else int main(){} diff --git a/tests/conn_run_cancel.cpp b/tests/conn_run_cancel.cpp index 1218e56f..edc971d0 100644 --- a/tests/conn_run_cancel.cpp +++ b/tests/conn_run_cancel.cpp @@ -26,6 +26,8 @@ using net::experimental::as_tuple; using boost::redis::request; using boost::redis::response; using boost::redis::ignore; +using boost::redis::async_run; +using namespace std::chrono_literals; #include using namespace net::experimental::awaitable_operators; @@ -33,15 +35,13 @@ using namespace net::experimental::awaitable_operators; auto async_cancel_run_with_timer() -> net::awaitable { auto ex = co_await net::this_coro::executor; - auto const endpoints = resolve(); connection conn{ex}; - net::connect(conn.next_layer(), endpoints); net::steady_timer st{ex}; st.expires_after(std::chrono::seconds{1}); boost::system::error_code ec1, ec2; - co_await (conn.async_run(redir(ec1)) || st.async_wait(redir(ec2))); + co_await (async_run(conn, "127.0.0.1", "6379", 10s, 10s, redir(ec1)) || st.async_wait(redir(ec2))); BOOST_CHECK_EQUAL(ec1, boost::asio::error::basic_errors::operation_aborted); BOOST_TEST(!ec2); @@ -58,16 +58,14 @@ auto async_check_cancellation_not_missed(int n, std::chrono::milliseconds ms) -> net::awaitable { auto ex = co_await net::this_coro::executor; - auto const endpoints = resolve(); connection conn{ex}; net::steady_timer timer{ex}; for (auto i = 0; i < n; ++i) { timer.expires_after(ms); - net::connect(conn.next_layer(), endpoints); boost::system::error_code ec1, ec2; - co_await (conn.async_run(redir(ec1)) || timer.async_wait(redir(ec2))); + co_await (async_run(conn, "127.0.0.1", "6379", 10s, 10s, redir(ec1)) || timer.async_wait(redir(ec2))); BOOST_CHECK_EQUAL(ec1, boost::asio::error::basic_errors::operation_aborted); std::cout << "Counter: " << i << std::endl; } @@ -147,10 +145,7 @@ BOOST_AUTO_TEST_CASE(check_implicit_cancel_not_missed_1024) BOOST_AUTO_TEST_CASE(reset_before_run_completes) { net::io_context ioc; - auto const endpoints = resolve(); connection conn{ioc}; - net::connect(conn.next_layer(), endpoints); - // Sends a ping just as a means of waiting until we are connected. request req; @@ -161,8 +156,7 @@ BOOST_AUTO_TEST_CASE(reset_before_run_completes) BOOST_TEST(!ec); conn.reset_stream(); }); - - conn.async_run([&](auto ec){ + async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){ BOOST_CHECK_EQUAL(ec, net::error::operation_aborted); }); diff --git a/tests/conn_tls.cpp b/tests/conn_tls.cpp index 5b9f2fe0..56b85b84 100644 --- a/tests/conn_tls.cpp +++ b/tests/conn_tls.cpp @@ -23,6 +23,18 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; +using endpoints = net::ip::tcp::resolver::results_type; + +auto +resolve( + std::string const& host = "127.0.0.1", + std::string const& port = "6379") -> endpoints +{ + net::io_context ioc; + net::ip::tcp::resolver resv{ioc}; + return resv.resolve(host, port); +} + struct endpoint { std::string host; std::string port; diff --git a/tests/issue_50.cpp b/tests/issue_50.cpp index b2675a3a..cb87ca18 100644 --- a/tests/issue_50.cpp +++ b/tests/issue_50.cpp @@ -6,14 +6,15 @@ // Must come before any asio header, otherwise build fails on msvc. #include +#include #include #if defined(BOOST_ASIO_HAS_CO_AWAIT) #include -#include +#include #include -#include "../examples/common/common.hpp" +#include "../examples/start.hpp" namespace net = boost::asio; using namespace net::experimental::awaitable_operators; @@ -21,7 +22,10 @@ using steady_timer = net::use_awaitable_t<>::as_default_on_t; using boost::redis::request; using boost::redis::response; using boost::redis::ignore; -using boost::redis::experimental::async_check_health; +using boost::redis::async_check_health; +using boost::redis::async_run; +using connection = boost::asio::use_awaitable_t<>::as_default_on_t; +using namespace std::chrono_literals; // Push consumer auto receiver(std::shared_ptr conn) -> net::awaitable @@ -65,8 +69,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable // The loop will reconnect on connection lost. To exit type Ctrl-C twice. for (int i = 0; i < 10; ++i) { - co_await connect(conn, host, port); - co_await ((conn->async_run() || receiver(conn) || async_check_health(*conn) || periodic_task(conn)) && + co_await ((async_run(*conn, host, port) || receiver(conn) || async_check_health(*conn) || periodic_task(conn)) && conn->async_exec(req)); conn->reset_stream(); diff --git a/tests/run.cpp b/tests/run.cpp new file mode 100644 index 00000000..4b3edc1e --- /dev/null +++ b/tests/run.cpp @@ -0,0 +1,76 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#define BOOST_TEST_MODULE run +#include + +#include +#include +#include +#include + +namespace net = boost::asio; + +using connection = boost::redis::connection; +using boost::redis::async_run; +using boost::system::error_code; +using namespace std::chrono_literals; + +bool is_host_not_found(error_code ec) +{ + if (ec == net::error::netdb_errors::host_not_found) return true; + if (ec == net::error::netdb_errors::host_not_found_try_again) return true; + return false; +} + +BOOST_AUTO_TEST_CASE(resolve_bad_host) +{ + net::io_context ioc; + + connection conn{ioc}; + async_run(conn, "Atibaia", "6379", 1000s, 1000s, [](auto ec){ + BOOST_TEST(is_host_not_found(ec)); + }); + + ioc.run(); +} + +BOOST_AUTO_TEST_CASE(resolve_with_timeout) +{ + net::io_context ioc; + + connection conn{ioc}; + async_run(conn, "Atibaia", "6379", 1ms, 1ms, [](auto ec){ + BOOST_CHECK_EQUAL(ec, boost::redis::error::resolve_timeout); + }); + + ioc.run(); +} + +BOOST_AUTO_TEST_CASE(connect_bad_port) +{ + net::io_context ioc; + + connection conn{ioc}; + async_run(conn, "127.0.0.1", "1", 1000s, 10s, [](auto ec){ + BOOST_CHECK_EQUAL(ec, net::error::basic_errors::connection_refused); + }); + + ioc.run(); +} + +BOOST_AUTO_TEST_CASE(connect_with_timeout) +{ + net::io_context ioc; + + connection conn{ioc}; + async_run(conn, "example.com", "1", 10s, 1ms, [](auto ec){ + BOOST_CHECK_EQUAL(ec, boost::redis::error::connect_timeout); + }); + + ioc.run(); +} +