From a411cc50fc9c57a33467e29107e6340b7b774da9 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 5 Jun 2022 18:26:08 +0200 Subject: [PATCH] Simplifies and enhances code modularity. --- Makefile.am | 8 +- aedis/adapter/detail/adapters.hpp | 2 +- aedis/aedis.hpp | 5 +- aedis/detail/net.hpp | 170 ++++++++++++ aedis/{generic => }/error.hpp | 13 +- aedis/generic/connection.hpp | 54 +--- aedis/generic/detail/connection_ops.hpp | 328 +++++------------------- aedis/{generic => }/impl/error.ipp | 7 +- aedis/resp3/exec.hpp | 180 +++++++++++++ aedis/{generic => resp3}/request.hpp | 10 +- aedis/src.hpp | 2 +- examples/adapter.cpp | 4 +- examples/chat_room.cpp | 4 +- examples/containers.cpp | 7 +- examples/echo_server.cpp | 4 +- examples/intro.cpp | 4 +- examples/subscriber.cpp | 4 +- tests/high_level.cpp | 26 +- tests/intro_sync.cpp | 2 +- tools/commands.cpp | 2 +- 20 files changed, 472 insertions(+), 364 deletions(-) create mode 100644 aedis/detail/net.hpp rename aedis/{generic => }/error.hpp (76%) rename aedis/{generic => }/impl/error.ipp (86%) create mode 100644 aedis/resp3/exec.hpp rename aedis/{generic => resp3}/request.hpp (97%) diff --git a/Makefile.am b/Makefile.am index c033bd9a..2cb4edf3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -52,12 +52,12 @@ endif nobase_include_HEADERS =\ $(top_srcdir)/aedis/src.hpp\ + $(top_srcdir)/aedis/error.hpp\ + $(top_srcdir)/aedis/impl/error.ipp\ + $(top_srcdir)/aedis/detail/net.hpp\ $(top_srcdir)/aedis/redis/command.hpp\ $(top_srcdir)/aedis/generic/connection.hpp\ - $(top_srcdir)/aedis/generic/request.hpp\ - $(top_srcdir)/aedis/generic/error.hpp\ $(top_srcdir)/aedis/generic/adapt.hpp\ - $(top_srcdir)/aedis/generic/impl/error.ipp\ $(top_srcdir)/aedis/generic/detail/connection_ops.hpp\ $(top_srcdir)/aedis/sentinel/command.hpp\ $(top_srcdir)/aedis/aedis.hpp\ @@ -74,7 +74,9 @@ nobase_include_HEADERS =\ $(top_srcdir)/aedis/resp3/impl/error.ipp\ $(top_srcdir)/aedis/resp3/type.hpp\ $(top_srcdir)/aedis/resp3/read.hpp\ + $(top_srcdir)/aedis/resp3/exec.hpp\ $(top_srcdir)/aedis/resp3/write.hpp\ + $(top_srcdir)/aedis/resp3/request.hpp\ $(top_srcdir)/aedis/redis/impl/command.ipp\ $(top_srcdir)/aedis/sentinel/impl/command.ipp\ $(top_srcdir)/aedis/resp3/detail/impl/parser.ipp\ diff --git a/aedis/adapter/detail/adapters.hpp b/aedis/adapter/detail/adapters.hpp index 105fee52..8d05b075 100644 --- a/aedis/adapter/detail/adapters.hpp +++ b/aedis/adapter/detail/adapters.hpp @@ -25,8 +25,8 @@ #include #include +#include #include -#include #include #include diff --git a/aedis/aedis.hpp b/aedis/aedis.hpp index 0bada7fa..27e2bd44 100644 --- a/aedis/aedis.hpp +++ b/aedis/aedis.hpp @@ -7,14 +7,15 @@ #ifndef AEDIS_HPP #define AEDIS_HPP +#include #include #include +#include +#include #include #include #include #include -#include -#include #include #include diff --git a/aedis/detail/net.hpp b/aedis/detail/net.hpp new file mode 100644 index 00000000..6c5514b0 --- /dev/null +++ b/aedis/detail/net.hpp @@ -0,0 +1,170 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef AEDIS_NET_HPP +#define AEDIS_NET_HPP + +#include + +#include +#include +#include +#include +#include + +namespace aedis { +namespace detail { + +#include + +template < + class Protocol, + class Executor, + class EndpointSequence + > +struct connect_op { + boost::asio::basic_socket* socket; + boost::asio::steady_timer* timer; + EndpointSequence* endpoints; + boost::asio::coroutine coro; + + template + void operator()( Self& self + , std::array order = {} + , boost::system::error_code ec1 = {} + , typename Protocol::endpoint const& ep = {} + , boost::system::error_code ec2 = {}) + { + reenter (coro) + { + yield + boost::asio::experimental::make_parallel_group( + [this](auto token) + { + auto f = [](boost::system::error_code const&, typename Protocol::endpoint const&) { return true; }; + return boost::asio::async_connect(*socket, *endpoints, f, token); + }, + [this](auto token) { return timer->async_wait(token);} + ).async_wait( + boost::asio::experimental::wait_for_one(), + std::move(self)); + + switch (order[0]) { + case 0: + { + if (ec1) { + self.complete(ec1, ep); + return; + } + } break; + + case 1: + { + if (!ec2) { + self.complete(error::connect_timeout, ep); + return; + } + } break; + + default: BOOST_ASSERT(false); + } + + self.complete({}, ep); + } + } +}; + +struct resolve_op { + boost::asio::ip::tcp::resolver* resv; + boost::asio::steady_timer* timer; + boost::string_view host; + boost::string_view port; + boost::asio::coroutine coro; + + template + void operator()( Self& self + , std::array order = {} + , boost::system::error_code ec1 = {} + , boost::asio::ip::tcp::resolver::results_type res = {} + , boost::system::error_code ec2 = {}) + { + reenter (coro) + { + yield + boost::asio::experimental::make_parallel_group( + [this](auto token) { return resv->async_resolve(host.data(), port.data(), token);}, + [this](auto token) { return timer->async_wait(token);} + ).async_wait( + boost::asio::experimental::wait_for_one(), + std::move(self)); + + switch (order[0]) { + case 0: + { + if (ec1) { + self.complete(ec1, {}); + return; + } + } break; + + case 1: + { + if (!ec2) { + self.complete(error::resolve_timeout, {}); + return; + } + } break; + + default: BOOST_ASSERT(false); + } + + self.complete({}, res); + } + } +}; + +#include + +template < + class Protocol, + class Executor, + class EndpointSequence, + class CompletionToken = boost::asio::default_completion_token_t + > +auto async_connect( + boost::asio::basic_socket& socket, + boost::asio::steady_timer& timer, + EndpointSequence ep, + CompletionToken&& token = boost::asio::default_completion_token_t{}) +{ + return boost::asio::async_compose + < CompletionToken + , void(boost::system::error_code, typename Protocol::endpoint const&) + >(connect_op + {&socket, &timer, &ep}, token, socket, timer); +} + +template < + class CompletionToken = + boost::asio::default_completion_token_t + > +auto async_resolve( + boost::asio::ip::tcp::resolver& resv, + boost::asio::steady_timer& timer, + boost::string_view host, + boost::string_view port, + CompletionToken&& token = CompletionToken{}) +{ + return boost::asio::async_compose + < CompletionToken + , void(boost::system::error_code, boost::asio::ip::tcp::resolver::results_type) + >(resolve_op{&resv, &timer, host, port}, token, resv, timer); +} + +} // detail +} // aedis + +#endif // AEDIS_NET_HPP diff --git a/aedis/generic/error.hpp b/aedis/error.hpp similarity index 76% rename from aedis/generic/error.hpp rename to aedis/error.hpp index d8bc6809..7830618f 100644 --- a/aedis/generic/error.hpp +++ b/aedis/error.hpp @@ -4,13 +4,12 @@ * accompanying file LICENSE.txt) */ -#ifndef AEDIS_GENERIC_ERROR_HPP -#define AEDIS_GENERIC_ERROR_HPP +#ifndef AEDIS_ERROR_HPP +#define AEDIS_ERROR_HPP #include namespace aedis { -namespace generic { /** \brief Generic errors. * \ingroup any @@ -31,9 +30,6 @@ enum class error /// Idle timeout. idle_timeout, - - /// Write stop requested. - write_stop_requested, }; /** \brief Creates a error_code object from an error. @@ -41,14 +37,13 @@ enum class error */ boost::system::error_code make_error_code(error e); -} // generic } // aedis namespace std { template<> -struct is_error_code_enum<::aedis::generic::error> : std::true_type {}; +struct is_error_code_enum<::aedis::error> : std::true_type {}; } // std -#endif // AEDIS_GENERIC_ERROR_HPP +#endif // AEDIS_ERROR_HPP diff --git a/aedis/generic/connection.hpp b/aedis/generic/connection.hpp index 8af116f2..898ef556 100644 --- a/aedis/generic/connection.hpp +++ b/aedis/generic/connection.hpp @@ -17,8 +17,8 @@ #include #include +#include #include -#include #include namespace aedis { @@ -43,19 +43,13 @@ public: using next_layer_type = AsyncReadWriteStream; /// Type of requests used by the connection. - using request_type = generic::request; + using request_type = resp3::request; using default_completion_token_type = boost::asio::default_completion_token_t; /** @brief Configuration parameters. */ struct config { - /// Ip address or name of the Redis server. - std::string host = "127.0.0.1"; - - /// Port where the Redis server is listening. - std::string port = "6379"; - /// Timeout of the \c async_resolve operation. std::chrono::milliseconds resolve_timeout = std::chrono::seconds{5}; @@ -168,12 +162,16 @@ public: * \return This function returns only when there is an error. */ template - auto async_run(CompletionToken token = CompletionToken{}) + auto + async_run( + boost::string_view host = "127.0.0.1", + boost::string_view port = "6379", + CompletionToken token = CompletionToken{}) { return boost::asio::async_compose < CompletionToken , void(boost::system::error_code) - >(detail::run_op{this}, token, resv_); + >(detail::run_op{this, host, port}, token, resv_); } /** @brief Asynchrnously schedules a command for execution. @@ -236,9 +234,7 @@ private: template friend struct detail::ping_op; template friend struct detail::run_op; template friend struct detail::exec_op; - template friend struct detail::exec_internal_impl_op; template friend struct detail::exec_internal_op; - template friend struct detail::write_op; template friend struct detail::writer_op; template friend struct detail::write_with_timeout_op; template friend struct detail::connect_with_timeout_op; @@ -268,22 +264,20 @@ private: } auto make_dynamic_buffer() - { - return boost::asio::dynamic_buffer(read_buffer_, cfg_.max_read_size); - } + { return boost::asio::dynamic_buffer(read_buffer_, cfg_.max_read_size); } - // Calls connection::async_resolve with the resolve timeout passed in - // the config. Uses the write_timer_ to perform the timeout op. template auto async_resolve_with_timeout( + boost::string_view host, + boost::string_view port, CompletionToken&& token = default_completion_token_type{}) { return boost::asio::async_compose < CompletionToken , void(boost::system::error_code) - >(detail::resolve_with_timeout_op{this}, - token, resv_.get_executor()); + >(detail::resolve_with_timeout_op{this, host, port}, + token, resv_); } // Calls connection::async_connect with a timeout. @@ -309,16 +303,6 @@ private: >(detail::reader_op{this}, token, resv_.get_executor()); } - template - auto - async_write(CompletionToken&& token = default_completion_token_type{}) - { - return boost::asio::async_compose - < CompletionToken - , void(boost::system::error_code, std::size_t) - >(detail::write_op{this}, token, resv_); - } - // Write with a timeout. template auto @@ -371,18 +355,6 @@ private: >(detail::check_idle_op{this}, token, check_idle_timer_); } - template - auto async_exec_internal_impl( - request_type const& req, - CompletionToken token = CompletionToken{}) - { - return boost::asio::async_compose - < CompletionToken - , void(boost::system::error_code) - >(detail::exec_internal_impl_op{this, &req}, - token, resv_); - } - template auto async_exec_internal( request_type const& req, diff --git a/aedis/generic/detail/connection_ops.hpp b/aedis/generic/detail/connection_ops.hpp index 05767224..c78d7756 100644 --- a/aedis/generic/detail/connection_ops.hpp +++ b/aedis/generic/detail/connection_ops.hpp @@ -9,18 +9,19 @@ #include +#include #include #include -#include #include -#include #include +#include +#include #include #include #include +#include #include -#include #include namespace aedis { @@ -30,36 +31,18 @@ namespace detail { #include template -struct exec_internal_impl_op { - Conn* cli; - typename Conn::request_type const* req; +struct connect_with_timeout_op { + Conn* conn; boost::asio::coroutine coro; template void operator()( Self& self , boost::system::error_code ec = {} - , std::size_t n = 0) + , boost::asio::ip::tcp::endpoint const& ep = {}) { reenter (coro) { - yield - boost::asio::async_write( - *cli->socket_, - boost::asio::buffer(req->payload()), - std::move(self)); - - if (ec) { - self.complete(ec); - return; - } - - yield - resp3::async_read( - *cli->socket_, - cli->make_dynamic_buffer(), - [](resp3::node const&, boost::system::error_code&) { }, - std::move(self)); - + yield aedis::detail::async_connect(*conn->socket_, conn->write_timer_, conn->endpoints_, std::move(self)); self.complete(ec); } } @@ -73,44 +56,35 @@ struct exec_internal_op { template void operator()( Self& self - , std::array order = {} - , boost::system::error_code ec1 = {} - , boost::system::error_code ec2 = {}) + , boost::system::error_code ec = {} + , std::size_t n = 0) { reenter (coro) { - // Idle timeout. - cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_interval); + yield resp3::async_exec( *cli->socket_, cli->check_idle_timer_, *req, adapter::adapt(), cli->make_dynamic_buffer(), std::move(self)); + self.complete(ec); + } + } +}; +template +struct resolve_with_timeout_op { + Conn* cli; + boost::string_view host; + boost::string_view port; + boost::asio::coroutine coro; + + template + void operator()( Self& self + , boost::system::error_code ec = {} + , boost::asio::ip::tcp::resolver::results_type res = {}) + { + reenter (coro) + { yield - boost::asio::experimental::make_parallel_group( - [this](auto token) { return cli->async_exec_internal_impl(*req, token);}, - [this](auto token) { return cli->check_idle_timer_.async_wait(token);} - ).async_wait( - boost::asio::experimental::wait_for_one(), - std::move(self)); - - switch (order[0]) { - case 0: - { - if (ec1) { - self.complete(ec1); - return; - } - } break; - - case 1: - { - if (!ec2) { - self.complete(error::idle_timeout); - return; - } - } break; - - default: BOOST_ASSERT(false); - } - - self.complete({}); + aedis::detail::async_resolve(cli->resv_, cli->write_timer_, host, port, std::move(self)); + cli->endpoints_ = res; + self.complete(ec); } } }; @@ -316,160 +290,6 @@ struct check_idle_op { } }; -template -struct resolve_with_timeout_op { - Conn* cli; - boost::asio::coroutine coro; - - template - void operator()( Self& self - , std::array order = {} - , boost::system::error_code ec1 = {} - , boost::asio::ip::tcp::resolver::results_type res = {} - , boost::system::error_code ec2 = {}) - { - reenter (coro) - { - // Tries to resolve with a timeout. We can use the writer - // timer here as there is no ongoing write operation. - cli->write_timer_.expires_after(cli->cfg_.resolve_timeout); - - yield - boost::asio::experimental::make_parallel_group( - [this](auto token) { return cli->resv_.async_resolve(cli->cfg_.host.data(), cli->cfg_.port.data(), token);}, - [this](auto token) { return cli->write_timer_.async_wait(token);} - ).async_wait( - boost::asio::experimental::wait_for_one(), - std::move(self)); - - switch (order[0]) { - case 0: - { - if (ec1) { - self.complete(ec1); - return; - } - } break; - - case 1: - { - if (!ec2) { - self.complete(error::resolve_timeout); - return; - } - } break; - - default: BOOST_ASSERT(false); - } - - cli->endpoints_ = res; - self.complete({}); - } - } -}; - -template < - class Protocol, - class Executor, - class EndpointSequence - > -struct connect_with_timeout_impl_op { - boost::asio::basic_socket* socket; - boost::asio::steady_timer* timer; - EndpointSequence* endpoints; - boost::asio::coroutine coro; - - template - void operator()( Self& self - , std::array order = {} - , boost::system::error_code ec1 = {} - , typename Protocol::endpoint const& ep = {} - , boost::system::error_code ec2 = {}) - { - reenter (coro) - { - yield - boost::asio::experimental::make_parallel_group( - [this](auto token) - { - auto f = [](boost::system::error_code const&, typename Protocol::endpoint const&) { return true; }; - return boost::asio::async_connect(*socket, *endpoints, f, token); - }, - [this](auto token) { return timer->async_wait(token);} - ).async_wait( - boost::asio::experimental::wait_for_one(), - std::move(self)); - - switch (order[0]) { - case 0: - { - if (ec1) { - self.complete(ec1, ep); - return; - } - } break; - - case 1: - { - if (!ec2) { - self.complete(error::connect_timeout, ep); - return; - } - } break; - - default: BOOST_ASSERT(false); - } - - self.complete({}, ep); - } - } -}; - -template < - class Protocol, - class Executor, - class EndpointSequence, - class CompletionToken = boost::asio::default_completion_token_t - > -auto async_connect_with_timeout( - boost::asio::basic_socket& socket, - boost::asio::steady_timer& timer, - EndpointSequence ep, - CompletionToken&& token = boost::asio::default_completion_token_t{}) -{ - return boost::asio::async_compose - < CompletionToken - , void(boost::system::error_code, typename Protocol::endpoint const&) - >(connect_with_timeout_impl_op - {&socket, &timer, &ep}, token, socket, timer); -} - -template -struct connect_with_timeout_op { - Conn* conn; - boost::asio::coroutine coro; - - template - void operator()( Self& self - , boost::system::error_code ec = {} - , boost::asio::ip::tcp::endpoint const& ep = {}) - { - reenter (coro) - { - conn->write_timer_.expires_after(conn->cfg_.connect_timeout); - - yield - async_connect_with_timeout( - *conn->socket_, - conn->write_timer_, - conn->endpoints_, - std::move(self)); - - self.complete(ec); - } - } -}; - template struct read_write_check_ping_op { Conn* cli; @@ -527,6 +347,8 @@ struct read_write_check_ping_op { template struct run_op { Conn* cli; + boost::string_view host; + boost::string_view port; boost::asio::coroutine coro; template @@ -534,7 +356,8 @@ struct run_op { { reenter (coro) { - yield cli->async_resolve_with_timeout(std::move(self)); + cli->write_timer_.expires_after(cli->cfg_.resolve_timeout); + yield cli->async_resolve_with_timeout(host, port, std::move(self)); if (ec) { self.complete(ec); return; @@ -545,6 +368,7 @@ struct run_op { typename Conn::next_layer_type >(cli->resv_.get_executor()); + cli->write_timer_.expires_after(cli->cfg_.connect_timeout); yield cli->async_connect_with_timeout(std::move(self)); if (ec) { self.complete(ec); @@ -553,6 +377,7 @@ struct run_op { cli->req_.clear(); cli->req_.push(Command::hello, 3); + cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_interval); yield cli->async_exec_internal(cli->req_, std::move(self)); if (ec) { cli->close(); @@ -571,51 +396,6 @@ struct run_op { } }; -template -struct write_op { - Conn* cli; - boost::asio::coroutine coro; - - template - void operator()( Self& self - , boost::system::error_code ec = {} - , std::size_t n = 0) - { - reenter (coro) - { - BOOST_ASSERT(!cli->reqs_.empty()); - BOOST_ASSERT(!cli->payload_next_.empty()); - - // Prepare for the next write. - cli->n_cmds_ = cli->n_cmds_next_; - cli->n_cmds_next_ = 0; - cli->payload_ = cli->payload_next_; - cli->payload_next_.clear(); - - yield - boost::asio::async_write( - *cli->socket_, - boost::asio::buffer(cli->payload_), - std::move(self)); - - BOOST_ASSERT(!cli->reqs_.empty()); - if (cli->reqs_.front()->n_cmds == 0) { - // Some requests don't have response, so their timers - // won't be canceled on read op, we have to do it here. - cli->reqs_.front()->timer.cancel_one(); - // Notice we don't have to call - // cli->wait_read_timer_.cancel_one(); as that operation - // is ongoing. - self.complete({}, n); - return; - } - - cli->payload_.clear(); - self.complete(ec, n); - } - } -}; - template struct write_with_timeout_op { Conn* cli; @@ -630,11 +410,9 @@ struct write_with_timeout_op { { reenter (coro) { - cli->write_timer_.expires_after(cli->cfg_.write_timeout); - yield boost::asio::experimental::make_parallel_group( - [this](auto token) { return cli->async_write(token);}, + [this](auto token) { return boost::asio::async_write(*cli->socket_, boost::asio::buffer(cli->payload_), token);}, [this](auto token) { return cli->write_timer_.async_wait(token);} ).async_wait( boost::asio::experimental::wait_for_one(), @@ -678,22 +456,40 @@ struct writer_op { { reenter (coro) for (;;) { - // When cli->cmds_ we are still processing the last request. - // The timer must be however canceled so we can unblock the - // channel. - if (!cli->reqs_.empty()) { + BOOST_ASSERT(!cli->reqs_.empty()); + BOOST_ASSERT(!cli->payload_next_.empty()); + + // Prepare for the next write. + cli->n_cmds_ = cli->n_cmds_next_; + cli->n_cmds_next_ = 0; + cli->payload_ = cli->payload_next_; + cli->payload_next_.clear(); + + cli->write_timer_.expires_after(cli->cfg_.write_timeout); yield cli->async_write_with_timeout(std::move(self)); if (ec) { cli->close(); self.complete(ec); return; } + + cli->payload_.clear(); + BOOST_ASSERT(!cli->reqs_.empty()); + if (cli->reqs_.front()->n_cmds == 0) { + // Some requests don't have response, so their timers + // won't be canceled on read op, we have to do it here. + cli->reqs_.front()->timer.cancel_one(); + // Notice we don't have to call + // cli->wait_read_timer_.cancel_one(); as that + // operation is ongoing. + } } yield cli->wait_write_timer_.async_wait(std::move(self)); if (!cli->socket_->is_open()) { - self.complete(error::write_stop_requested); + // The completion has been explicited requested. + self.complete({}); return; } } diff --git a/aedis/generic/impl/error.ipp b/aedis/impl/error.ipp similarity index 86% rename from aedis/generic/impl/error.ipp rename to aedis/impl/error.ipp index 6f4b0bf4..840d9f6a 100644 --- a/aedis/generic/impl/error.ipp +++ b/aedis/impl/error.ipp @@ -4,17 +4,16 @@ * accompanying file LICENSE.txt) */ -#include +#include namespace aedis { -namespace generic { namespace detail { struct error_category_impl : boost::system::error_category { char const* name() const noexcept override { - return "aedis.generic"; + return "aedis"; } std::string message(int ev) const override @@ -25,7 +24,6 @@ struct error_category_impl : boost::system::error_category { case error::read_timeout: return "Read operation timeout."; case error::write_timeout: return "Write operation timeout."; case error::idle_timeout: return "Idle timeout."; - case error::write_stop_requested: return "Write stop requested."; default: BOOST_ASSERT(false); } } @@ -44,5 +42,4 @@ boost::system::error_code make_error_code(error e) return boost::system::error_code{static_cast(e), detail::category()}; } -} // generic } // aedis diff --git a/aedis/resp3/exec.hpp b/aedis/resp3/exec.hpp new file mode 100644 index 00000000..d244fe31 --- /dev/null +++ b/aedis/resp3/exec.hpp @@ -0,0 +1,180 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef AEDIS_RESP3_EXEC_HPP +#define AEDIS_RESP3_EXEC_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace aedis { +namespace resp3 { +namespace detail { + +#include + +template < + class AsyncStream, + class Command, + class Adapter, + class DynamicBuffer + > +struct exec_op { + AsyncStream* socket; + request const* req; + Adapter adapter; + DynamicBuffer dbuf; + boost::asio::coroutine coro; + + template + void operator()( Self& self + , boost::system::error_code ec = {} + , std::size_t n = 0) + { + reenter (coro) + { + yield + boost::asio::async_write( + *socket, + boost::asio::buffer(req->payload()), + std::move(self)); + + if (ec) { + self.complete(ec, 0); + return; + } + + yield resp3::async_read(*socket, dbuf, adapter, std::move(self)); + self.complete(ec, n); + } + } +}; + +#include + +} // detail + +template < + class AsyncStream, + class Command, + class Adapter, + class DynamicBuffer, + class CompletionToken = boost::asio::default_completion_token_t + > +auto async_exec( + AsyncStream& socket, + request const& req, + Adapter adapter, + DynamicBuffer dbuf, + CompletionToken token = CompletionToken{}) +{ + return boost::asio::async_compose + < CompletionToken + , void(boost::system::error_code, std::size_t) + >(detail::exec_op + {&socket, &req, adapter, dbuf}, token, socket); +} + +namespace detail { + +#include + +template < + class AsyncStream, + class Command, + class Adapter, + class DynamicBuffer + > +struct exec_with_timeout_op { + AsyncStream* socket; + boost::asio::steady_timer* timer; + request const* req; + Adapter adapter; + DynamicBuffer dbuf; + boost::asio::coroutine coro; + + template + void operator()( Self& self + , std::array order = {} + , boost::system::error_code ec1 = {} + , std::size_t n = 0 + , boost::system::error_code ec2 = {}) + { + reenter (coro) + { + yield + boost::asio::experimental::make_parallel_group( + [this](auto token) { return resp3::async_exec(*socket, *req, adapter, dbuf, token);}, + [this](auto token) { return timer->async_wait(token);} + ).async_wait( + boost::asio::experimental::wait_for_one(), + std::move(self)); + + switch (order[0]) { + case 0: + { + if (ec1) { + self.complete(ec1, 0); + return; + } + } break; + + case 1: + { + if (!ec2) { + self.complete(aedis::error::idle_timeout, 0); + return; + } + } break; + + default: BOOST_ASSERT(false); + } + + self.complete({}, n); + } + } +}; + +#include + +} // detail + +template < + class AsyncStream, + class Command, + class Adapter, + class DynamicBuffer, + class CompletionToken = boost::asio::default_completion_token_t + > +auto async_exec( + AsyncStream& socket, + boost::asio::steady_timer& timer, + request const& req, + Adapter adapter, + DynamicBuffer dbuf, + CompletionToken token = CompletionToken{}) +{ + return boost::asio::async_compose + < CompletionToken + , void(boost::system::error_code, std::size_t) + >(detail::exec_with_timeout_op + {&socket, &timer, &req, adapter, dbuf}, token, socket, timer); +} + +} // resp3 +} // aedis + +#endif // AEDIS_RESP3_EXEC_HPP diff --git a/aedis/generic/request.hpp b/aedis/resp3/request.hpp similarity index 97% rename from aedis/generic/request.hpp rename to aedis/resp3/request.hpp index 1a8c1d68..fbca4594 100644 --- a/aedis/generic/request.hpp +++ b/aedis/resp3/request.hpp @@ -4,8 +4,8 @@ * accompanying file LICENSE.txt) */ -#ifndef AEDIS_GENERIC_REQUEST_HPP -#define AEDIS_GENERIC_REQUEST_HPP +#ifndef AEDIS_RESP3_REQUEST_HPP +#define AEDIS_RESP3_REQUEST_HPP #include #include @@ -17,7 +17,7 @@ // the value type is a pair. namespace aedis { -namespace generic { +namespace resp3 { /** @brief Creates Redis requests from user data. * \ingroup any @@ -209,7 +209,7 @@ private: std::vector commands_; }; -} // generic +} // resp3 } // aedis -#endif // AEDIS_GENERIC_SERIALIZER_HPP +#endif // AEDIS_RESP3_SERIALIZER_HPP diff --git a/aedis/src.hpp b/aedis/src.hpp index 91cfb024..6da73e03 100644 --- a/aedis/src.hpp +++ b/aedis/src.hpp @@ -4,10 +4,10 @@ * accompanying file LICENSE.txt) */ +#include #include #include #include #include #include #include -#include diff --git a/examples/adapter.cpp b/examples/adapter.cpp index 685d2f5c..53ac594d 100644 --- a/examples/adapter.cpp +++ b/examples/adapter.cpp @@ -15,8 +15,8 @@ namespace net = boost::asio; namespace adapter = aedis::adapter; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using connection = aedis::generic::connection; using node_type = aedis::resp3::node; using error_code = boost::system::error_code; @@ -50,7 +50,7 @@ int main() req.push(command::quit); db.async_exec(req, adapter, handler); - db.async_run(handler); + db.async_run("127.0.0.1", "6379", handler); ioc.run(); std::cout diff --git a/examples/chat_room.cpp b/examples/chat_room.cpp index f320520c..03f6004f 100644 --- a/examples/chat_room.cpp +++ b/examples/chat_room.cpp @@ -15,8 +15,8 @@ namespace net = boost::asio; namespace generic = aedis::generic; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t; using connection = aedis::generic::connection; @@ -139,7 +139,7 @@ int main() // Redis client and receiver. auto db = std::make_shared(ioc); - db->async_run(handler); + db->async_run("127.0.0.1", "6379", handler); auto sessions = std::make_shared(); net::co_spawn(ioc, reader(db, sessions), net::detached); diff --git a/examples/containers.cpp b/examples/containers.cpp index c1419352..7b4a3e2b 100644 --- a/examples/containers.cpp +++ b/examples/containers.cpp @@ -17,8 +17,8 @@ namespace net = boost::asio; namespace generic = aedis::generic; using boost::optional; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using connection = aedis::generic::connection; // Response used in this example. @@ -43,7 +43,6 @@ int main() req1.push_range(command::rpush, "rpush-key", vec); req1.push_range(command::sadd, "sadd-key", set); req1.push_range(command::hset, "hset-key", map); - db.async_exec(req1, generic::adapt(), handler); // Request that retrieves the containers. request req2; @@ -63,9 +62,9 @@ int main() std::string // quit > resp; + db.async_exec(req1, generic::adapt(), handler); db.async_exec(req2, generic::adapt(resp), handler); - - db.async_run(handler); + db.async_run("127.0.0.1", "6379", handler); ioc.run(); auto const& r = std::get<4>(resp); diff --git a/examples/echo_server.cpp b/examples/echo_server.cpp index 35eb24ac..da0e1b6a 100644 --- a/examples/echo_server.cpp +++ b/examples/echo_server.cpp @@ -12,7 +12,7 @@ namespace net = boost::asio; namespace generic = aedis::generic; -using generic::request; +using aedis::resp3::request; using aedis::redis::command; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t; @@ -39,7 +39,7 @@ net::awaitable listener() { auto ex = co_await net::this_coro::executor; auto db = std::make_shared(ex); - db->async_run(net::detached); + db->async_run("127.0.0.1", "6379", net::detached); tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555}); for (;;) diff --git a/examples/intro.cpp b/examples/intro.cpp index 04f6fef1..dc7919f5 100644 --- a/examples/intro.cpp +++ b/examples/intro.cpp @@ -13,8 +13,8 @@ namespace net = boost::asio; namespace generic = aedis::generic; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using connection = generic::connection; auto handler =[](auto ec, auto...) @@ -31,7 +31,7 @@ int main() net::io_context ioc; connection db{ioc}; db.async_exec(req, generic::adapt(resp), handler); - db.async_run(handler); + db.async_run("127.0.0.1", "6379", handler); ioc.run(); std::cout << std::get<0>(resp) << std::endl; diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index 2f7eb93d..06720793 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -14,8 +14,8 @@ namespace net = boost::asio; namespace generic = aedis::generic; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using connection = generic::connection; using response_type = std::vector>; @@ -61,6 +61,6 @@ int main() net::io_context ioc; auto db = std::make_shared(ioc); net::co_spawn(ioc, reader(db), net::detached); - db->async_run(handler); + db->async_run("127.0.0.1", "6379", handler); ioc.run(); } diff --git a/tests/high_level.cpp b/tests/high_level.cpp index be23807d..70f556d3 100644 --- a/tests/high_level.cpp +++ b/tests/high_level.cpp @@ -22,8 +22,8 @@ namespace net = boost::asio; namespace resp3 = aedis::resp3; namespace generic = aedis::generic; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using connection = aedis::generic::connection; using error_code = boost::system::error_code; using net::experimental::as_tuple; @@ -45,10 +45,8 @@ void test_resolve_error() }; net::io_context ioc; - connection::config cfg; - cfg.host = "Atibaia"; - connection db(ioc, cfg); - db.async_run(f); + connection db(ioc); + db.async_run("Atibaia", "6379", f); ioc.run(); } @@ -62,10 +60,8 @@ void test_connect_error() }; net::io_context ioc; - connection::config cfg; - cfg.port = "1"; - connection db(ioc, cfg); - db.async_run(f); + connection db(ioc); + db.async_run("127.0.0.1", "1", f); ioc.run(); } @@ -85,7 +81,7 @@ void test_quit() //expect_eq(r, 152UL); }); - db->async_run([](auto ec){ + db->async_run("127.0.0.1", "6379", [](auto ec){ expect_error(ec, net::error::misc_errors::eof); }); @@ -125,7 +121,7 @@ void test_push() net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached); - db->async_run([](auto ec){ + db->async_run("127.0.0.1", "6379", [](auto ec){ expect_error(ec, net::error::misc_errors::eof); }); @@ -143,7 +139,7 @@ net::awaitable run5(std::shared_ptr db) expect_no_error(ec); }); - auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable)); + auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable)); expect_error(ec, net::error::misc_errors::eof); } @@ -154,7 +150,7 @@ net::awaitable run5(std::shared_ptr db) expect_no_error(ec); }); - auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable)); + auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable)); expect_error(ec, net::error::misc_errors::eof); } } @@ -188,8 +184,8 @@ void test_idle() expect_no_error(ec); }); - db->async_run([](auto ec){ - expect_error(ec, aedis::generic::error::idle_timeout); + db->async_run("127.0.0.1", "6379", [](auto ec){ + expect_error(ec, aedis::error::idle_timeout); }); ioc.run(); diff --git a/tests/intro_sync.cpp b/tests/intro_sync.cpp index 5bf880c4..8da6b73e 100644 --- a/tests/intro_sync.cpp +++ b/tests/intro_sync.cpp @@ -15,8 +15,8 @@ namespace net = boost::asio; namespace resp3 = aedis::resp3; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using aedis::adapter::adapt; using net::ip::tcp; diff --git a/tools/commands.cpp b/tools/commands.cpp index b20dfa24..19345677 100644 --- a/tools/commands.cpp +++ b/tools/commands.cpp @@ -16,8 +16,8 @@ namespace net = boost::asio; namespace resp3 = aedis::resp3; +using aedis::resp3::request; using aedis::redis::command; -using aedis::generic::request; using aedis::resp3::node; using aedis::adapter::adapt; using net::ip::tcp;