diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index f9429e11..2f671ce0 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -31,7 +31,7 @@ namespace aedis { */ template class connection : - public connection_base< + private connection_base< typename AsyncReadWriteStream::executor_type, connection> { public: @@ -40,6 +40,10 @@ public: /// Type of the next layer using next_layer_type = AsyncReadWriteStream; + using base_type = connection_base>; + + /// List of operations that can be canceled. + using operation = typename base_type::operation; /** @brief Connection configuration parameters. */ @@ -67,6 +71,9 @@ public: : connection(ioc.get_executor()) { } + /// Returns the associated executor. + auto get_executor() {return stream_.get_executor();} + /// Resets the underlying stream. void reset_stream() { @@ -174,28 +181,117 @@ public: return base_type::async_run(ep, req, adapter, ts, std::move(token)); } + /** @brief Executes a command on the Redis server asynchronously. + * + * This function will send a request to the Redis server and + * complete when the response arrives. If the request contains + * only commands that don't expect a response, the completion + * occurs after it has been written to the underlying stream. + * Multiple concurrent calls to this function will be + * automatically queued by the implementation. + * + * @param req Request object. + * @param adapter Response adapter. + * @param token Asio completion token. + * + * For an example see echo_server.cpp. The completion token must + * have the following signature + * + * @code + * void f(boost::system::error_code, std::size_t); + * @endcode + * + * Where the second parameter is the size of the response in + * bytes. + */ + template < + class Adapter = detail::response_traits::adapter_type, + class CompletionToken = boost::asio::default_completion_token_t> + auto async_exec( + resp3::request const& req, + Adapter adapter = adapt(), + CompletionToken token = CompletionToken{}) + { + return base_type::async_exec(req, adapter, std::move(token)); + } + + /** @brief Receives server side pushes asynchronously. + * + * Users that expect server pushes should call this function in a + * loop. If a push arrives and there is no reader, the connection + * will hang and eventually timeout. + * + * @param adapter The response adapter. + * @param token The Asio completion token. + * + * For an example see subscriber.cpp. The completion token must + * have the following signature + * + * @code + * void f(boost::system::error_code, std::size_t); + * @endcode + * + * Where the second parameter is the size of the push in + * bytes. + */ + template < + class Adapter = detail::response_traits::adapter_type, + class CompletionToken = boost::asio::default_completion_token_t> + auto async_receive_push( + Adapter adapter = adapt(), + CompletionToken token = CompletionToken{}) + { + return base_type::async_receive_push(adapter, std::move(token)); + } + + /** @brief Cancel operations. + * + * @li `operation::exec`: Cancels operations started with + * `async_exec`. Has precedence over + * `request::config::close_on_connection_lost` + * @li operation::run: Cancels the `async_run` operation. Notice + * that the preferred way to close a connection is to send a + * [QUIT](https://redis.io/commands/quit/) command to the server. + * An unresponsive Redis server will also cause the idle-checks to + * timeout and lead to `connection::async_run` completing with + * `error::idle_timeout`. Calling `cancel(operation::run)` + * directly should be seen as the last option. + * @li operation::receive_push: Cancels any ongoing callto + * `async_receive_push`. + * + * @param op: The operation to be cancelled. + * @returns The number of operations that have been canceled. + */ + auto cancel(operation op) -> std::size_t + { return base_type::cancel(op); } + private: - using base_type = connection_base>; using this_type = connection; template friend class connection_base; template friend struct detail::exec_read_op; template friend struct detail::exec_op; template friend struct detail::receive_push_op; - template friend struct detail::ping_op; template friend struct detail::check_idle_op; template friend struct detail::reader_op; template friend struct detail::writer_op; - template friend struct detail::connect_with_timeout_op; + template friend struct detail::connect_with_timeout_op; template friend struct detail::run_op; + template friend struct aedis::detail::ping_op; - template - auto async_connect(timeouts ts, CompletionToken&& token) + template + auto + async_connect( + boost::asio::ip::tcp::resolver::results_type const& endpoints, + timeouts ts, + Timer& timer, + CompletionToken&& token) { return boost::asio::async_compose < CompletionToken , void(boost::system::error_code) - >(detail::connect_with_timeout_op{this, ts}, token, stream_); + >(detail::connect_with_timeout_op{this, &endpoints, ts, &timer}, + token, stream_); } void close() { stream_.close(); } diff --git a/include/aedis/connection_base.hpp b/include/aedis/connection_base.hpp index cd0ab906..42e8070e 100644 --- a/include/aedis/connection_base.hpp +++ b/include/aedis/connection_base.hpp @@ -27,8 +27,7 @@ namespace aedis { -/** @brief Base class for high level Redis asynchronous connections. - * @ingroup any +/** Base class for high level Redis asynchronous connections. * * This class is not meant to be instantiated directly but as base * class in the CRTP. @@ -40,8 +39,8 @@ namespace aedis { template class connection_base { public: - /// Executor type. using executor_type = Executor; + using this_type = connection_base; /** @brief List of async operations exposed by this class. * @@ -57,10 +56,6 @@ public: receive_push, }; - /** @brief Constructor - * - * @param ex The executor. - */ explicit connection_base(executor_type ex) : resv_{ex} , ping_timer_{ex} @@ -75,27 +70,8 @@ public: read_timer_.expires_at(std::chrono::steady_clock::time_point::max()); } - /// Returns the executor. auto get_executor() {return resv_.get_executor();} - /** @brief Cancel operations. - * - * @li `operation::exec`: Cancels operations started with - * `async_exec`. Has precedence over - * `request::config::close_on_connection_lost` - * @li operation::run: Cancels the `async_run` operation. Notice - * that the preferred way to close a connection is to send a - * [QUIT](https://redis.io/commands/quit/) command to the server. - * An unresponsive Redis server will also cause the idle-checks to - * timeout and lead to `connection::async_run` completing with - * `error::idle_timeout`. Calling `cancel(operation::run)` - * directly should be seen as the last option. - * @li operation::receive_push: Cancels any ongoing callto - * `async_receive_push`. - * - * @param op: The operation to be cancelled. - * @returns The number of operations that have been canceled. - */ auto cancel(operation op) -> std::size_t { switch (op) { @@ -142,29 +118,6 @@ public: } } - /** @brief Executes a command on the Redis server asynchronously. - * - * This function will send a request to the Redis server and - * complete when the response arrives. If the request contains - * only commands that don't expect a response, the completion - * occurs after it has been written to the underlying stream. - * Multiple concurrent calls to this function will be - * automatically queued by the implementation. - * - * @param req Request object. - * @param adapter Response adapter. - * @param token Asio completion token. - * - * For an example see echo_server.cpp. The completion token must - * have the following signature - * - * @code - * void f(boost::system::error_code, std::size_t); - * @endcode - * - * Where the second parameter is the size of the response in - * bytes. - */ template < class Adapter = detail::response_traits::adapter_type, class CompletionToken = boost::asio::default_completion_token_t> @@ -181,25 +134,6 @@ public: >(detail::exec_op{&derived(), &req, adapter}, token, resv_); } - /** @brief Receives server side pushes asynchronously. - * - * Users that expect server pushes should call this function in a - * loop. If a push arrives and there is no reader, the connection - * will hang and eventually timeout. - * - * @param adapter The response adapter. - * @param token The Asio completion token. - * - * For an example see subscriber.cpp. The completion token must - * have the following signature - * - * @code - * void f(boost::system::error_code, std::size_t); - * @endcode - * - * Where the second parameter is the size of the push in - * bytes. - */ template < class Adapter = detail::response_traits::adapter_type, class CompletionToken = boost::asio::default_completion_token_t> @@ -214,7 +148,6 @@ public: >(detail::receive_push_op{&derived(), f}, token, resv_); } -protected: template auto async_run(endpoint ep, Timeouts ts, CompletionToken token) @@ -307,7 +240,7 @@ private: return boost::asio::async_compose < CompletionToken , void(boost::system::error_code) - >(detail::resolve_with_timeout_op{&derived(), d}, + >(detail::resolve_with_timeout_op{this, d}, token, resv_); } @@ -337,7 +270,7 @@ private: return boost::asio::async_compose < CompletionToken , void(boost::system::error_code) - >(detail::start_op{&derived(), ts}, token, resv_); + >(detail::start_op{this, ts}, token, resv_); } template @@ -428,13 +361,7 @@ private: // IO objects resolver_type resv_; -protected: timer_type ping_timer_; - endpoint ep_; - // The result of async_resolve. - boost::asio::ip::tcp::resolver::results_type endpoints_; - -private: timer_type check_idle_timer_; timer_type writer_timer_; timer_type read_timer_; @@ -450,6 +377,9 @@ private: resp3::request req_; std::vector> response_; + endpoint ep_; + // The result of async_resolve. + boost::asio::ip::tcp::resolver::results_type endpoints_; }; } // aedis diff --git a/include/aedis/detail/connection_ops.hpp b/include/aedis/detail/connection_ops.hpp index e9ad63c5..37c3a5b9 100644 --- a/include/aedis/detail/connection_ops.hpp +++ b/include/aedis/detail/connection_ops.hpp @@ -30,10 +30,12 @@ namespace aedis::detail { -template +template struct connect_with_timeout_op { Conn* conn = nullptr; + boost::asio::ip::tcp::resolver::results_type const* endpoints = nullptr; typename Conn::timeouts ts; + Timer* timer = nullptr; boost::asio::coroutine coro{}; template @@ -43,10 +45,10 @@ struct connect_with_timeout_op { { reenter (coro) { - conn->ping_timer_.expires_after(ts.connect_timeout); + timer->expires_after(ts.connect_timeout); yield detail::async_connect( - conn->next_layer(), conn->ping_timer_, conn->endpoints_, std::move(self)); + conn->next_layer(), *timer, *endpoints, std::move(self)); self.complete(ec); } } @@ -425,7 +427,7 @@ struct run_op { } yield - conn->derived().async_connect(ts, std::move(self)); + conn->derived().async_connect(conn->endpoints_, ts, conn->ping_timer_, std::move(self)); if (ec) { conn->cancel(Conn::operation::run); self.complete(ec); diff --git a/include/aedis/ssl/connection.hpp b/include/aedis/ssl/connection.hpp index 20a1e477..89ae95b2 100644 --- a/include/aedis/ssl/connection.hpp +++ b/include/aedis/ssl/connection.hpp @@ -35,7 +35,7 @@ class connection; */ template class connection> : - public connection_base< + private connection_base< typename boost::asio::ssl::stream::executor_type, connection>> { public: @@ -44,6 +44,10 @@ public: /// Executor type. using executor_type = typename next_layer_type::executor_type; + using base_type = connection_base>>; + + /// List of operations that can be canceled. + using operation = typename base_type::operation; /** @brief Connection configuration parameters. */ @@ -76,6 +80,9 @@ public: : connection(ioc.get_executor(), ctx) { } + /// Returns the associated executor. + auto get_executor() {return stream_.get_executor();} + /// Reset the underlying stream. void reset_stream(boost::asio::ssl::context& ctx) { @@ -119,30 +126,71 @@ public: return base_type::async_run(ep, req, adapter, ts, std::move(token)); } + /** @brief Executes a command on the Redis server asynchronously. + * + * See aedis::connection::async_exec for detailed information. + */ + template < + class Adapter = aedis::detail::response_traits::adapter_type, + class CompletionToken = boost::asio::default_completion_token_t> + auto async_exec( + resp3::request const& req, + Adapter adapter = adapt(), + CompletionToken token = CompletionToken{}) + { + return base_type::async_exec(req, adapter, std::move(token)); + } + + /** @brief Receives server side pushes asynchronously. + * + * See aedis::connection::async_receive_push for detailed information. + */ + template < + class Adapter = aedis::detail::response_traits::adapter_type, + class CompletionToken = boost::asio::default_completion_token_t> + auto async_receive_push( + Adapter adapter = adapt(), + CompletionToken token = CompletionToken{}) + { + return base_type::async_receive_push(adapter, std::move(token)); + } + + /** @brief Cancel operations. + * + * See aedis::connection::cancel for detailed information. + */ + auto cancel(operation op) -> std::size_t + { return base_type::cancel(op); } + private: - using base_type = connection_base>>; using this_type = connection; template friend class aedis::connection_base; template friend struct aedis::detail::exec_op; - template friend struct detail::ssl_connect_with_timeout_op; + template friend struct detail::ssl_connect_with_timeout_op; template friend struct aedis::detail::run_op; template friend struct aedis::detail::writer_op; - template friend struct aedis::detail::ping_op; template friend struct aedis::detail::check_idle_op; template friend struct aedis::detail::reader_op; + template friend struct aedis::detail::exec_read_op; + template friend struct aedis::detail::ping_op; auto& lowest_layer() noexcept { return stream_.lowest_layer(); } auto is_open() const noexcept { return stream_.next_layer().is_open(); } void close() { stream_.next_layer().close(); } - template - auto async_connect(timeouts ts, CompletionToken&& token) + template + auto + async_connect( + boost::asio::ip::tcp::resolver::results_type const& endpoints, + timeouts ts, + Timer& timer, + CompletionToken&& token) { return boost::asio::async_compose < CompletionToken , void(boost::system::error_code) - >(detail::ssl_connect_with_timeout_op{this, ts}, token, stream_); + >(detail::ssl_connect_with_timeout_op{this, &endpoints, ts, &timer}, token, stream_); } next_layer_type stream_; diff --git a/include/aedis/ssl/detail/connection_ops.hpp b/include/aedis/ssl/detail/connection_ops.hpp index 54e8737f..b2718284 100644 --- a/include/aedis/ssl/detail/connection_ops.hpp +++ b/include/aedis/ssl/detail/connection_ops.hpp @@ -72,10 +72,12 @@ auto async_handshake( >(handshake_op{&stream, &timer}, token, stream, timer); } -template +template struct ssl_connect_with_timeout_op { Conn* conn = nullptr; + boost::asio::ip::tcp::resolver::results_type const* endpoints = nullptr; typename Conn::timeouts ts; + Timer* timer = nullptr; boost::asio::coroutine coro{}; template @@ -85,21 +87,21 @@ struct ssl_connect_with_timeout_op { { reenter (coro) { - conn->ping_timer_.expires_after(ts.connect_timeout); + timer->expires_after(ts.connect_timeout); yield aedis::detail::async_connect( - conn->lowest_layer(), conn->ping_timer_, conn->endpoints_, std::move(self)); + conn->lowest_layer(), *timer, *endpoints, std::move(self)); if (ec) { self.complete(ec); return; } - conn->ping_timer_.expires_after(ts.handshake_timeout); + timer->expires_after(ts.handshake_timeout); yield - async_handshake(conn->next_layer(), conn->ping_timer_, std::move(self)); + async_handshake(conn->next_layer(), *timer, std::move(self)); self.complete(ec); } }