From 82430afc8b8e1f3db448ce2aaf4167e9be852c92 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 11 Jun 2023 11:33:07 +0200 Subject: [PATCH 1/3] Make the connection non-generic on the executor type. --- examples/cpp20_chat_room.cpp | 7 +-- examples/cpp20_containers.cpp | 8 +-- examples/cpp20_echo_server.cpp | 4 +- examples/cpp20_intro.cpp | 5 +- examples/cpp20_intro_tls.cpp | 4 +- examples/cpp20_json.cpp | 4 +- examples/cpp20_protobuf.cpp | 4 +- examples/cpp20_resolve_with_sentinel.cpp | 2 +- examples/cpp20_streams.cpp | 4 +- examples/cpp20_subscriber.cpp | 5 +- include/boost/redis/connection.hpp | 62 +++++++++++++++++++++++- include/boost/redis/impl/connection.ipp | 33 +++++++++++++ include/boost/redis/src.hpp | 1 + tests/test_issue_50.cpp | 2 +- 14 files changed, 119 insertions(+), 26 deletions(-) create mode 100644 include/boost/redis/impl/connection.ipp diff --git a/examples/cpp20_chat_room.cpp b/examples/cpp20_chat_room.cpp index 489e5952..24c4a025 100644 --- a/examples/cpp20_chat_room.cpp +++ b/examples/cpp20_chat_room.cpp @@ -19,11 +19,12 @@ namespace net = boost::asio; using stream_descriptor = net::deferred_t::as_default_on_t; -using connection = net::deferred_t::as_default_on_t; using signal_set = net::deferred_t::as_default_on_t; using boost::redis::request; using boost::redis::generic_response; using boost::redis::config; +using boost::redis::connection; +using boost::redis::ignore; using net::redirect_error; using net::use_awaitable; using boost::system::error_code; @@ -41,7 +42,7 @@ receiver(std::shared_ptr conn) -> net::awaitable while (conn->will_reconnect()) { // Subscribe to channels. - co_await conn->async_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); // Loop reading Redis push messages. for (generic_response resp;;) { @@ -66,7 +67,7 @@ auto publisher(std::shared_ptr in, std::shared_ptrasync_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); msg.erase(0, n); } } diff --git a/examples/cpp20_containers.cpp b/examples/cpp20_containers.cpp index ec2ee53b..dfedd82e 100644 --- a/examples/cpp20_containers.cpp +++ b/examples/cpp20_containers.cpp @@ -20,7 +20,7 @@ using boost::redis::response; using boost::redis::ignore_t; using boost::redis::ignore; using boost::redis::config; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; void print(std::map const& cont) { @@ -47,7 +47,7 @@ auto store(std::shared_ptr conn) -> net::awaitable req.push_range("RPUSH", "rpush-key", vec); req.push_range("HSET", "hset-key", map); - co_await conn->async_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); } auto hgetall(std::shared_ptr conn) -> net::awaitable @@ -60,7 +60,7 @@ auto hgetall(std::shared_ptr conn) -> net::awaitable response> resp; // Executes the request and reads the response. - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); print(std::get<0>(resp).value()); } @@ -81,7 +81,7 @@ auto transaction(std::shared_ptr conn) -> net::awaitable response>, std::optional>> // exec > resp; - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); print(std::get<0>(std::get<3>(resp).value()).value().value()); print(std::get<1>(std::get<3>(resp).value()).value().value()); diff --git a/examples/cpp20_echo_server.cpp b/examples/cpp20_echo_server.cpp index 6d2691fb..9b637240 100644 --- a/examples/cpp20_echo_server.cpp +++ b/examples/cpp20_echo_server.cpp @@ -18,11 +18,11 @@ namespace net = boost::asio; using tcp_socket = net::deferred_t::as_default_on_t; using tcp_acceptor = net::deferred_t::as_default_on_t; using signal_set = net::deferred_t::as_default_on_t; -using connection = net::deferred_t::as_default_on_t; using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::system::error_code; +using boost::redis::connection; using namespace std::chrono_literals; auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> net::awaitable @@ -33,7 +33,7 @@ auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> for (std::string buffer;;) { auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n"); req.push("PING", buffer); - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); co_await net::async_write(socket, net::buffer(std::get<0>(resp).value())); std::get<0>(resp).value().clear(); req.clear(); diff --git a/examples/cpp20_intro.cpp b/examples/cpp20_intro.cpp index 3526950d..195122cf 100644 --- a/examples/cpp20_intro.cpp +++ b/examples/cpp20_intro.cpp @@ -17,8 +17,7 @@ namespace net = boost::asio; using boost::redis::request; using boost::redis::response; using boost::redis::config; -using boost::redis::logger; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; // Called from the main function (see main.cpp) auto co_main(config cfg) -> net::awaitable @@ -34,7 +33,7 @@ auto co_main(config cfg) -> net::awaitable response resp; // Executes the request. - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); std::cout << "PING: " << std::get<0>(resp).value() << std::endl; diff --git a/examples/cpp20_intro_tls.cpp b/examples/cpp20_intro_tls.cpp index 23b491e1..b911af27 100644 --- a/examples/cpp20_intro_tls.cpp +++ b/examples/cpp20_intro_tls.cpp @@ -18,7 +18,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::redis::logger; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; auto verify_certificate(bool, net::ssl::verify_context&) -> bool { @@ -45,7 +45,7 @@ auto co_main(config cfg) -> net::awaitable conn->next_layer().set_verify_mode(net::ssl::verify_peer); conn->next_layer().set_verify_callback(verify_certificate); - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); std::cout << "Response: " << std::get<0>(resp).value() << std::endl; diff --git a/examples/cpp20_json.cpp b/examples/cpp20_json.cpp index 063614b1..d0c6423c 100644 --- a/examples/cpp20_json.cpp +++ b/examples/cpp20_json.cpp @@ -29,7 +29,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; using boost::redis::config; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; // Struct that will be stored in Redis using json serialization. struct user { @@ -64,7 +64,7 @@ auto co_main(config cfg) -> net::awaitable response resp; - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); // Prints the first ping diff --git a/examples/cpp20_protobuf.cpp b/examples/cpp20_protobuf.cpp index 32e92396..75eb8fd2 100644 --- a/examples/cpp20_protobuf.cpp +++ b/examples/cpp20_protobuf.cpp @@ -25,7 +25,7 @@ using boost::redis::response; using boost::redis::operation; using boost::redis::ignore_t; using boost::redis::config; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; // The protobuf type described in examples/person.proto using tutorial::person; @@ -76,7 +76,7 @@ net::awaitable co_main(config cfg) response resp; // Sends the request and receives the response. - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); std::cout diff --git a/examples/cpp20_resolve_with_sentinel.cpp b/examples/cpp20_resolve_with_sentinel.cpp index 35bcf686..8401cd1e 100644 --- a/examples/cpp20_resolve_with_sentinel.cpp +++ b/examples/cpp20_resolve_with_sentinel.cpp @@ -19,7 +19,7 @@ using boost::redis::response; using boost::redis::ignore_t; using boost::redis::config; using boost::redis::address; -using connection = boost::asio::use_awaitable_t<>::as_default_on_t; +using boost::redis::connection; auto redir(boost::system::error_code& ec) { return net::redirect_error(net::use_awaitable, ec); } diff --git a/examples/cpp20_streams.cpp b/examples/cpp20_streams.cpp index 4eca3050..f0939756 100644 --- a/examples/cpp20_streams.cpp +++ b/examples/cpp20_streams.cpp @@ -25,7 +25,7 @@ using boost::redis::config; using boost::redis::generic_response; using boost::redis::operation; using boost::redis::request; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; using signal_set = net::deferred_t::as_default_on_t; auto stream_reader(std::shared_ptr conn) -> net::awaitable @@ -39,7 +39,7 @@ auto stream_reader(std::shared_ptr conn) -> net::awaitable for (;;) { req.push("XREAD", "BLOCK", "0", "STREAMS", "test-topic", stream_id); - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); // std::cout << "Response: "; // for (int i = 0; i < resp->value().size(); ++i) { diff --git a/examples/cpp20_subscriber.cpp b/examples/cpp20_subscriber.cpp index 1753095f..f7c1a4b3 100644 --- a/examples/cpp20_subscriber.cpp +++ b/examples/cpp20_subscriber.cpp @@ -24,8 +24,9 @@ using boost::redis::request; using boost::redis::generic_response; using boost::redis::logger; using boost::redis::config; +using boost::redis::ignore; using boost::system::error_code; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; using signal_set = net::deferred_t::as_default_on_t; /* This example will subscribe and read pushes indefinitely. @@ -54,7 +55,7 @@ receiver(std::shared_ptr conn) -> net::awaitable while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); // Loop reading Redis pushs messages. for (generic_response resp;;) { diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 4ea560de..db226761 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -14,10 +14,10 @@ #include #include #include +#include #include #include -#include namespace boost::redis { namespace detail @@ -187,7 +187,65 @@ private: /** \brief A connection that uses the asio::any_io_executor. * \ingroup high-level-api */ -using connection = basic_connection; +class connection { +public: + /// Executor type. + using executor_type = asio::any_io_executor; + + /// Contructs from an executor. + explicit connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client); + + /// Contructs from a context. + explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client); + + executor_type get_executor() noexcept { return impl_.get_executor(); } + + template + auto async_run(config const& cfg, logger l, CompletionToken token) + { + return asio::async_initiate< + CompletionToken, void(boost::system::error_code)>( + [](auto handler, connection* self, config const* cfg, logger l) + { + self->async_run_impl(*cfg, l, std::move(handler)); + }, token, this, &cfg, l); + } + + template + auto async_receive(Response& response, CompletionToken token) + { + return impl_.async_receive(response, std::move(token)); + } + + template + auto async_exec(request const& req, Response& resp, CompletionToken token) + { + return impl_.async_exec(req, resp, std::move(token)); + } + + void cancel(operation op = operation::all); + + /// Returns true if the connection was canceled. + bool will_reconnect() const noexcept + { return impl_.will_reconnect();} + + /// Returns a reference to the next layer. + auto& next_layer() noexcept { return impl_.next_layer(); } + + /// Returns a const reference to the next layer. + auto const& next_layer() const noexcept { return impl_.next_layer(); } + + void reset_stream() { impl_.reset_stream();} + +private: + void + async_run_impl( + config const& cfg, + logger l, + asio::any_completion_handler token); + + basic_connection impl_; +}; } // boost::redis diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp new file mode 100644 index 00000000..977031a5 --- /dev/null +++ b/include/boost/redis/impl/connection.ipp @@ -0,0 +1,33 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include + +namespace boost::redis { + +connection::connection(executor_type ex, asio::ssl::context::method method) +: impl_{ex, method} +{ } + +connection::connection(asio::io_context& ioc, asio::ssl::context::method method) +: impl_(ioc.get_executor(), method) +{ } + +void +connection::async_run_impl( + config const& cfg, + logger l, + asio::any_completion_handler token) +{ + impl_.async_run(cfg, l, std::move(token)); +} + +void connection::cancel(operation op) +{ + impl_.cancel(op); +} + +} // namespace boost::redis diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index ef978afc..3a06c3e0 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include diff --git a/tests/test_issue_50.cpp b/tests/test_issue_50.cpp index 9abab044..dabffd84 100644 --- a/tests/test_issue_50.cpp +++ b/tests/test_issue_50.cpp @@ -30,10 +30,10 @@ using boost::redis::ignore; using boost::redis::logger; using boost::redis::config; using boost::redis::operation; +using boost::redis::connection; using boost::system::error_code; using boost::asio::use_awaitable; using boost::asio::redirect_error; -using connection = boost::asio::use_awaitable_t<>::as_default_on_t; using namespace std::chrono_literals; // Push consumer From d29a057fa61ea5498d0bc88a2ffa60b2ee6acf00 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Tue, 20 Jun 2023 23:01:17 +0200 Subject: [PATCH 2/3] Uses composition instead of inheritance in the connection class. --- CMakeLists.txt | 2 +- include/boost/redis/connection.hpp | 196 +++++++++++++++--- .../redis/{ => detail}/connection_base.hpp | 97 +-------- 3 files changed, 171 insertions(+), 124 deletions(-) rename include/boost/redis/{ => detail}/connection_base.hpp (88%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ba1d657..511ee395 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.14) -set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time") +#set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time") project( boost_redis diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index db226761..ccce89d2 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -7,7 +7,7 @@ #ifndef BOOST_REDIS_CONNECTION_HPP #define BOOST_REDIS_CONNECTION_HPP -#include +#include #include #include #include @@ -34,7 +34,7 @@ struct reconnection_op { BOOST_ASIO_CORO_REENTER (coro_) for (;;) { BOOST_ASIO_CORO_YIELD - conn_->async_run_one(conn_->cfg_, logger_, std::move(self)); + conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self)); conn_->cancel(operation::receive); logger_.on_connection_lost(ec); if (!conn_->will_reconnect() || is_cancelled(self)) { @@ -68,14 +68,15 @@ struct reconnection_op { * */ template -class basic_connection : public connection_base { +class basic_connection { public: - using base_type = connection_base; - using this_type = basic_connection; - /// Executor type. using executor_type = Executor; + /// Returns the underlying executor. + executor_type get_executor() noexcept + { return impl_.get_executor(); } + /// Rebinds the socket type to another executor. template struct rebind_executor @@ -87,7 +88,7 @@ public: /// Contructs from an executor. explicit basic_connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client) - : base_type{ex, method} + : impl_{ex, method} , timer_{ex} { } @@ -97,12 +98,28 @@ public: : basic_connection(ioc.get_executor(), method) { } - /** @brief High-level connection to Redis. + /** @brief Starts underlying connection operations. * - * This connection class adds reconnection functionality to - * `boost::redis::connection_base::async_run_one`. When a - * connection is lost for any reason, a new one is stablished - * automatically. To disable reconnection call + * This member function provides the following functionality + * + * 1. Resolve the address passed on `boost::redis::config::addr`. + * 2. Connect to one of the results obtained in the resolve operation. + * 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`. + * 4. Start a health-check operation where ping commands are sent + * at intervals specified in + * `boost::redis::config::health_check_interval`. The message passed to + * `PING` will be `boost::redis::config::health_check_id`. Passing a + * timeout with value zero will disable health-checks. If the Redis + * server does not respond to a health-check within two times the value + * specified here, it will be considered unresponsive and the connection + * will be closed and a new connection will be stablished. + * 5. Starts read and write operations with the Redis + * server. More specifically it will trigger the write of all + * requests i.e. calls to `async_exec` that happened prior to this + * call. + * + * When a connection is lost for any reason, a new one is + * stablished automatically. To disable reconnection call * `boost::redis::connection::cancel(operation::reconnection)`. * * @param cfg Configuration paramters. @@ -115,11 +132,6 @@ public: * void f(system::error_code); * @endcode * - * @remarks - * - * * This function will complete only if reconnection was disabled - * and the connection is lost. - * * For example on how to call this function refer to * cpp20_intro.cpp or any other example. */ @@ -132,6 +144,8 @@ public: Logger l = Logger{}, CompletionToken token = CompletionToken{}) { + using this_type = basic_connection; + cfg_ = cfg; l.set_prefix(cfg_.log_prefix); return asio::async_compose @@ -140,6 +154,77 @@ public: >(detail::reconnection_op{this, l}, token, timer_); } + /** @brief Receives server side pushes asynchronously. + * + * When pushes arrive and there is no `async_receive` operation in + * progress, pushed data, requests, and responses will be paused + * until `async_receive` is called again. Apps will usually want + * to call `async_receive` in a loop. + * + * To cancel an ongoing receive operation apps should call + * `connection::cancel(operation::receive)`. + * + * @param response Response object. + * @param token Completion token. + * + * For an example see cpp20_subscriber.cpp. The completion token must + * have the following signature + * + * @code + * void f(system::error_code, std::size_t); + * @endcode + * + * Where the second parameter is the size of the push received in + * bytes. + */ + template < + class Response = ignore_t, + class CompletionToken = asio::default_completion_token_t + > + auto + async_receive( + Response& response, + CompletionToken token = CompletionToken{}) + { + return impl_.async_receive(response, token); + } + + /** @brief Executes commands on the Redis server asynchronously. + * + * This function sends a request to the Redis server and waits for + * the responses to each individual command in the request. If the + * request contains only commands that don't expect a response, + * the completion occurs after it has been written to the + * underlying stream. Multiple concurrent calls to this function + * will be automatically queued by the implementation. + * + * @param req Request. + * @param resp Response. + * @param token Completion token. + * + * For an example see cpp20_echo_server.cpp. The completion token must + * have the following signature + * + * @code + * void f(system::error_code, std::size_t); + * @endcode + * + * Where the second parameter is the size of the response received + * in bytes. + */ + template < + class Response = ignore_t, + class CompletionToken = asio::default_completion_token_t + > + auto + async_exec( + request const& req, + Response& resp = ignore, + CompletionToken token = CompletionToken{}) + { + return impl_.async_exec(req, resp, token); + } + /** @brief Cancel operations. * * @li `operation::exec`: Cancels operations started with @@ -152,7 +237,7 @@ public: * @param op: The operation to be cancelled. * @returns The number of operations that have been canceled. */ - void cancel(operation op = operation::all) override + void cancel(operation op = operation::all) { switch (op) { case operation::reconnection: @@ -163,16 +248,51 @@ public: default: /* ignore */; } - base_type::cancel(op); + impl_.cancel(op); } /// Returns true if the connection was canceled. bool will_reconnect() const noexcept { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();} -private: - config cfg_; + /** @brief Reserve memory on the read and write internal buffers. + * + * This function will call `std::string::reserve` on the + * underlying buffers. + * + * @param read The new capacity of the read buffer. + * @param write The new capacity of the write buffer. + */ + void reserve(std::size_t read, std::size_t write) + { + impl_.reserve(read, write); + } + /// Sets the maximum size of the read buffer. + void set_max_buffer_read_size(std::size_t max_read_size) noexcept + { impl_.set_max_buffer_read_size(max_read_size); } + + /// Returns the ssl context. + auto const& get_ssl_context() const noexcept + { return impl_.get_ssl_context();} + + /// Returns the ssl context. + auto& get_ssl_context() noexcept + { return impl_.get_ssl_context();} + + /// Resets the underlying stream. + void reset_stream() + { impl_.reset_stream(); } + + /// Returns a reference to the next layer. + auto& next_layer() noexcept + { return impl_.next_layer(); } + + /// Returns a const reference to the next layer. + auto const& next_layer() const noexcept + { return impl_.next_layer(); } + +private: using timer_type = asio::basic_waitable_timer< std::chrono::steady_clock, @@ -181,11 +301,19 @@ private: template friend struct detail::reconnection_op; + config cfg_; + detail::connection_base impl_; timer_type timer_; }; -/** \brief A connection that uses the asio::any_io_executor. +/** \brief A basic_connection that type erases the executor. * \ingroup high-level-api + * + * This connection type uses the asio::any_io_executor and + * asio::any_completion_token to reduce compilation times. + * + * For documentaiton of each member function see + * `boost::redis::basic_connection`. */ class connection { public: @@ -198,8 +326,11 @@ public: /// Contructs from a context. explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client); - executor_type get_executor() noexcept { return impl_.get_executor(); } + /// Returns the underlying executor. + executor_type get_executor() noexcept + { return impl_.get_executor(); } + /// Calls `boost::redis::basic_connection::async_run`. template auto async_run(config const& cfg, logger l, CompletionToken token) { @@ -211,31 +342,38 @@ public: }, token, this, &cfg, l); } + /// Calls `boost::redis::basic_connection::async_receive`. template auto async_receive(Response& response, CompletionToken token) { return impl_.async_receive(response, std::move(token)); } + /// Calls `boost::redis::basic_connection::async_exec`. template auto async_exec(request const& req, Response& resp, CompletionToken token) { return impl_.async_exec(req, resp, std::move(token)); } + /// Calls `boost::redis::basic_connection::cancel`. void cancel(operation op = operation::all); - /// Returns true if the connection was canceled. + /// Calls `boost::redis::basic_connection::will_reconnect`. bool will_reconnect() const noexcept { return impl_.will_reconnect();} - /// Returns a reference to the next layer. - auto& next_layer() noexcept { return impl_.next_layer(); } + /// Calls `boost::redis::basic_connection::next_layer`. + auto& next_layer() noexcept + { return impl_.next_layer(); } - /// Returns a const reference to the next layer. - auto const& next_layer() const noexcept { return impl_.next_layer(); } + /// Calls `boost::redis::basic_connection::next_layer`. + auto const& next_layer() const noexcept + { return impl_.next_layer(); } - void reset_stream() { impl_.reset_stream();} + /// Calls `boost::redis::basic_connection::reset_stream`. + void reset_stream() + { impl_.reset_stream();} private: void diff --git a/include/boost/redis/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp similarity index 88% rename from include/boost/redis/connection_base.hpp rename to include/boost/redis/detail/connection_base.hpp index 554efe47..f8681769 100644 --- a/include/boost/redis/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -37,8 +37,7 @@ #include #include -namespace boost::redis { -namespace detail { +namespace boost::redis::detail { template struct wait_receive_op { @@ -473,7 +472,6 @@ struct reader_op { } } }; -} // detail /** @brief Base class for high level Redis asynchronous connections. * @ingroup high-level-api @@ -549,29 +547,6 @@ public: cancel_impl(op); } - /** @brief Executes commands on the Redis server asynchronously. - * - * This function sends a request to the Redis server and waits for - * the responses to each individual command in the request. If the - * request contains only commands that don't expect a response, - * the completion occurs after it has been written to the - * underlying stream. Multiple concurrent calls to this function - * will be automatically queued by the implementation. - * - * @param req Request. - * @param resp Response. - * @param token Completion token. - * - * For an example see cpp20_echo_server.cpp. The completion token must - * have the following signature - * - * @code - * void f(system::error_code, std::size_t); - * @endcode - * - * Where the second parameter is the size of the response received - * in bytes. - */ template < class Response = ignore_t, class CompletionToken = asio::default_completion_token_t @@ -592,29 +567,6 @@ public: >(redis::detail::exec_op{this, &req, f}, token, writer_timer_); } - /** @brief Receives server side pushes asynchronously. - * - * When pushes arrive and there is no `async_receive` operation in - * progress, pushed data, requests, and responses will be paused - * until `async_receive` is called again. Apps will usually want - * to call `async_receive` in a loop. - * - * To cancel an ongoing receive operation apps should call - * `connection::cancel(operation::receive)`. - * - * @param response Response object. - * @param token Completion token. - * - * For an example see cpp20_subscriber.cpp. The completion token must - * have the following signature - * - * @code - * void f(system::error_code, std::size_t); - * @endcode - * - * Where the second parameter is the size of the push received in - * bytes. - */ template < class Response = ignore_t, class CompletionToken = asio::default_completion_token_t @@ -634,60 +586,17 @@ public: >(redis::detail::receive_op{this, f}, token, read_op_timer_); } - /** @brief Starts underlying connection operations. - * - * Provides a high-level connection to the Redis server. It will - * perform the following steps - * - * 1. Resolve the address passed on `boost::redis::config::addr`. - * 2. Connect to one of the results obtained in the resolve operation. - * 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`. - * 4. Start a health-check operation where ping commands are sent - * at intervals specified in - * `boost::redis::config::health_check_interval`. The message passed to - * `PING` will be `boost::redis::config::health_check_id`. Passing a - * timeout with value zero will disable health-checks. If the Redis - * server does not respond to a health-check within two times the value - * specified here, it will be considered unresponsive and the connection - * will be closed and a new connection will be stablished. - * 5. Starts read and write operations with the Redis - * server. More specifically it will trigger the write of all - * requests i.e. calls to `async_exec` that happened prior to this - * call. - * - * @param cfg Configuration paramters. - * @param l Logger object. The interface expected is specified in the class `boost::redis::logger`. - * @param token Completion token. - * - * The completion token must have the following signature - * - * @code - * void f(system::error_code); - * @endcode - * - * For example on how to call this function refer to - * cpp20_intro.cpp or any other example. - */ template - auto async_run_one(config const& cfg, Logger l, CompletionToken token) + auto async_run(config const& cfg, Logger l, CompletionToken token) { runner_.set_config(cfg); l.set_prefix(runner_.get_config().log_prefix); return runner_.async_run(*this, l, std::move(token)); } - /// Sets the maximum size of the read buffer. void set_max_buffer_read_size(std::size_t max_read_size) noexcept {max_read_size_ = max_read_size;} - /** @brief Reserve memory on the read and write internal buffers. - * - * This function will call `std::string::reserve` on the - * underlying buffers. - * - * @param read The new capacity of the read buffer. - * @param write The new capacity of the write buffer. - */ void reserve(std::size_t read, std::size_t write) { read_buffer_.reserve(read); @@ -1024,6 +933,6 @@ private: std::size_t max_read_size_ = (std::numeric_limits::max)(); }; -} // boost::redis +} // boost::redis::detail #endif // BOOST_REDIS_CONNECTION_BASE_HPP From a715c251bf97dfd5ddb03356a8ddc6ac86481800 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Fri, 23 Jun 2023 22:24:34 +0200 Subject: [PATCH 3/3] Improvements in the docs. --- README.md | 121 +++++++++++++--------------------- examples/cpp20_subscriber.cpp | 1 + 2 files changed, 48 insertions(+), 74 deletions(-) diff --git a/README.md b/README.md index 7bc5da0f..6ab6f6ec 100644 --- a/README.md +++ b/README.md @@ -1,41 +1,30 @@ # boost_redis -Boost.Redis is a [Redis](https://redis.io/) client library built on top of +Boost.Redis is a high-level [Redis](https://redis.io/) client library built on top of [Boost.Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html) that implements Redis plain text protocol [RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md). It can multiplex any number of client requests, responses, and server pushes onto a single active socket -connection to the Redis server. The library hides low-level code away -from the user, which, in the majority of the cases will be concerned -with only three library entities - -* `boost::redis::connection`: A full-duplex connection to the Redis - server with high-level functions to execute Redis commands, receive - server pushes and automatic command [pipelines](https://redis.io/docs/manual/pipelining/). -* `boost::redis::request`: A container of Redis commands that supports - STL containers and user defined data types. -* `boost::redis::response`: Container of Redis responses. - -In the next sections we will cover all these points in detail with -examples. The requirements for using Boost.Redis are +connection to the Redis server. The requirements for using Boost.Redis are * Boost 1.81 or greater. * C++17 minimum. * Redis 6 or higher (must support RESP3). * Gcc (10, 11, 12), Clang (11, 13, 14) and Visual Studio (16 2019, 17 2022). -* Have basic-level knowledge about Redis and understand Asio and its asynchronous model. +* Have basic-level knowledge about [Redis](https://redis.io/docs/) + and [Boost.Asio](https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview.html). -To install Boost.Redis download the latest release on -https://github.com/boostorg/redis/releases. Boost.Redis is a header only -library, so you can starting using it right away by adding the -`include` subdirectory to your project and including +The latest release can be downloaded on +https://github.com/boostorg/redis/releases. The library headers can be +found in the `include` subdirectory and a compilation of the source ```cpp #include ``` -in no more than one source file in your applications. To build the +is required. The simplest way to do it is to included this header in +no more than one source file in your applications. To build the examples and tests cmake is supported, for example ```cpp @@ -45,21 +34,10 @@ $ BOOST_ROOT=/opt/boost_1_81_0 cmake --preset g++-11 # Windows $ cmake -G "Visual Studio 17 2022" -A x64 -B bin64 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake ``` + ## Connection -Readers that are not familiar with Redis are advised to learn more about -it on https://redis.io/docs/ before we start, in essence - -> Redis is an open source (BSD licensed), in-memory data structure -> store used as a database, cache, message broker, and streaming -> engine. Redis provides data structures such as strings, hashes, -> lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, -> geospatial indexes, and streams. Redis has built-in replication, Lua -> scripting, LRU eviction, transactions, and different levels of -> on-disk persistence, and provides high availability via Redis -> Sentinel and automatic partitioning with Redis Cluster. - Let us start with a simple application that uses a short-lived connection to send a [ping](https://redis.io/commands/ping/) command to Redis @@ -78,7 +56,7 @@ auto co_main(config const& cfg) -> net::awaitable response resp; // Executes the request. - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); std::cout << "PING: " << std::get<0>(resp).value() << std::endl; @@ -87,12 +65,12 @@ auto co_main(config const& cfg) -> net::awaitable The roles played by the `async_run` and `async_exec` functions are -* `connection::async_exec`: Execute the commands contained in the +* `async_exec`: Execute the commands contained in the request and store the individual responses in the `resp` object. Can be called from multiple places in your code concurrently. -* `connection::async_run`: Resolve, connect, ssl-handshake, - resp3-handshake, health-checks reconnection and coordinate low-level - read and write operations.. +* `async_run`: Resolve, connect, ssl-handshake, + resp3-handshake, health-checks, reconnection and coordinate low-level + read and write operations (among other things). ### Server pushes @@ -114,22 +92,22 @@ receiver(std::shared_ptr conn) -> net::awaitable request req; req.push("SUBSCRIBE", "channel"); - while (!conn->is_cancelled()) { + // Loop while reconnection is enabled + while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); - // Loop reading Redis pushs messages. + // Loop reading Redis pushes. for (generic_response resp;;) { error_code ec; co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. - std::cout - << resp.value().at(1).value - << " " << resp.value().at(2).value - << " " << resp.value().at(3).value - << std::endl; + + // Use the response resp in some way and then clear it. + ... + resp.value().clear(); } } @@ -167,22 +145,21 @@ req.push_range("HSET", "key", map); Sending a request to Redis is performed with `boost::redis::connection::async_exec` as already stated. - - ### Config flags The `boost::redis::request::config` object inside the request dictates how the `boost::redis::connection` should handle the request in some important situations. The reader is advised to read it carefully. + ## Responses Boost.Redis uses the following strategy to support Redis responses -* **Static**: For `boost::redis::request` whose sizes are known at compile time use the `response` type. +* `boost::redis::request` is used for requests whose number of commands are not dynamic. * **Dynamic**: Otherwise use `boost::redis::generic_response`. -For example, below is a request with a compile time size +For example, the request below has three commands ```cpp request req; @@ -191,18 +168,19 @@ req.push("INCR", "key"); req.push("QUIT"); ``` -To read the response to this request users can use the following tuple +and its response also has three comamnds and can be read in the +following response object ```cpp response ``` -The pattern might have become apparent to the reader: the tuple must +The response behaves as a tuple and must have as many elements as the request has commands (exceptions below). It is also necessary that each tuple element is capable of storing the response to the command it refers to, otherwise an error will occur. To ignore responses to individual commands in the request use the tag -`boost::redis::ignore_t` +`boost::redis::ignore_t`, for example ```cpp // Ignore the second and last responses. @@ -266,18 +244,14 @@ response< Where both are passed to `async_exec` as showed elsewhere ```cpp -co_await conn->async_exec(req, resp); +co_await conn->async_exec(req, resp, net::deferred); ``` -If the intention is to ignore the response to all commands altogether -use `ignore` +If the intention is to ignore responses altogether use `ignore` ```cpp // Ignores the response -co_await conn->async_exec(req, ignore); - -// The default response argument will also ignore responses. -co_await conn->async_exec(req); +co_await conn->async_exec(req, ignore, net::deferred); ``` Responses that contain nested aggregates or heterogeneous data @@ -294,7 +268,7 @@ Commands that have no response like * `"PSUBSCRIBE"` * `"UNSUBSCRIBE"` -must be **NOT** be included in the response tuple. For example, the request below +must **NOT** be included in the response tuple. For example, the request below ```cpp request req; @@ -304,7 +278,7 @@ req.push("QUIT"); ``` must be read in this tuple `response`, -that has size two. +that has static size two. ### Null @@ -320,17 +294,17 @@ response< ... > resp; -co_await conn->async_exec(req, resp); +co_await conn->async_exec(req, resp, net::deferred); ``` Everything else stays pretty much the same. ### Transactions -To read responses to transactions we must first observe that Redis will -queue the transaction commands and send their individual responses as elements -of an array, the array is itself the response to the `EXEC` command. -For example, to read the response to this request +To read responses to transactions we must first observe that Redis +will queue the transaction commands and send their individual +responses as elements of an array, the array is itself the response to +the `EXEC` command. For example, to read the response to this request ```cpp req.push("MULTI"); @@ -360,7 +334,7 @@ response< exec_resp_type, // exec > resp; -co_await conn->async_exec(req, resp); +co_await conn->async_exec(req, resp, net::deferred); ``` For a complete example see cpp20_containers.cpp. @@ -373,8 +347,8 @@ There are cases where responses to Redis commands won't fit in the model presented above, some examples are * Commands (like `set`) whose responses don't have a fixed -RESP3 type. Expecting an `int` and receiving a blob-string -will result in error. + RESP3 type. Expecting an `int` and receiving a blob-string + will result in error. * RESP3 aggregates that contain nested aggregates can't be read in STL containers. * Transactions with a dynamic number of commands can't be read in a `response`. @@ -408,7 +382,7 @@ using other types ```cpp // Receives any RESP3 simple or aggregate data type. boost::redis::generic_response resp; -co_await conn->async_exec(req, resp); +co_await conn->async_exec(req, resp, net::deferred); ``` For example, suppose we want to retrieve a hash data structure @@ -460,9 +434,8 @@ The examples below show how to use the features discussed so far * cpp17_intro.cpp: Uses callbacks and requires C++17. * cpp17_intro_sync.cpp: Runs `async_run` in a separate thread and performs synchronous calls to `async_exec`. -To avoid repetition code that is common to some examples has been -grouped in common.hpp. The main function used in some async examples -has been factored out in the main.cpp file. +The main function used in some async examples has been factored out in +the main.cpp file. ## Echo server benchmark @@ -701,7 +674,7 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. ## Changelog -### master (incorporates changes to conform the boost review and more) +### develop (incorporates changes to conform the boost review and more) * Adds Redis stream example. diff --git a/examples/cpp20_subscriber.cpp b/examples/cpp20_subscriber.cpp index f7c1a4b3..69884705 100644 --- a/examples/cpp20_subscriber.cpp +++ b/examples/cpp20_subscriber.cpp @@ -52,6 +52,7 @@ receiver(std::shared_ptr conn) -> net::awaitable request req; req.push("SUBSCRIBE", "channel"); + // Loop while reconnection is enabled while (conn->will_reconnect()) { // Reconnect to channels.