diff --git a/README.md b/README.md index 9b2db656..af0b575b 100644 --- a/README.md +++ b/README.md @@ -87,9 +87,9 @@ them are: * [Client-side caching](https://redis.io/docs/manual/client-side-caching/). The connection class supports server pushes by means of the -`connection::async_receive` function, which can be +`connection::async_receive2` function, which can be called in the same connection that is being used to execute commands. -The coroutine below shows how to use it: +The coroutine below shows how to use it ```cpp @@ -99,26 +99,25 @@ receiver(std::shared_ptr conn) -> net::awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + flat_tree resp; conn->set_receive_response(resp); // Loop while reconnection is enabled while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); // Loop reading Redis pushes. - for (;;) { - error_code ec; - co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec)); + for (error_code ec;;) { + co_await conn->async_receive2(resp, redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. // Use the response resp in some way and then clear it. ... - consume_one(resp); + resp.clear(); } } } @@ -126,4 +125,4 @@ receiver(std::shared_ptr conn) -> net::awaitable ## Further reading -Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html). \ No newline at end of file +Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html). diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index fe9f04f1..36b83d49 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -97,9 +97,9 @@ them are: * https://redis.io/docs/manual/client-side-caching/[Client-side caching]. The connection class supports server pushes by means of the -xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive`] function, which can be +xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive2`] function, which can be called in the same connection that is being used to execute commands. -The coroutine below shows how to use it: +The coroutine below shows how to use it [source,cpp] @@ -110,26 +110,25 @@ receiver(std::shared_ptr conn) -> net::awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + flat_tree resp; conn->set_receive_response(resp); // Loop while reconnection is enabled while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); // Loop reading Redis pushes. - for (;;) { - error_code ec; - co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec)); + for (error_code ec;;) { + co_await conn->async_receive2(resp, redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. - // Use the response resp in some way and then clear it. + // Use the response here and then clear it. ... - consume_one(resp); + resp.clear(); } } } diff --git a/example/cpp20_chat_room.cpp b/example/cpp20_chat_room.cpp index e765865e..562225f5 100644 --- a/example/cpp20_chat_room.cpp +++ b/example/cpp20_chat_room.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -30,11 +30,9 @@ using boost::asio::consign; using boost::asio::detached; using boost::asio::dynamic_buffer; using boost::asio::redirect_error; -using boost::asio::use_awaitable; using boost::redis::config; using boost::redis::connection; -using boost::redis::generic_response; -using boost::redis::ignore; +using boost::redis::generic_flat_response; using boost::redis::request; using boost::system::error_code; using namespace std::chrono_literals; @@ -47,20 +45,24 @@ auto receiver(std::shared_ptr conn) -> awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + generic_flat_response resp; conn->set_receive_response(resp); while (conn->will_reconnect()) { // Subscribe to channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); // Loop reading Redis push messages. for (error_code ec;;) { - co_await conn->async_receive(redirect_error(use_awaitable, ec)); + co_await conn->async_receive2(redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. - std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " - << resp.value().at(3).value << std::endl; + + for (auto const& elem: resp.value().get_view()) + std::cout << elem.value << "\n"; + + std::cout << std::endl; + resp.value().clear(); } } @@ -74,7 +76,7 @@ auto publisher(std::shared_ptr in, std::shared_ptrasync_exec(req, ignore); + co_await conn->async_exec(req); msg.erase(0, n); } } diff --git a/example/cpp20_subscriber.cpp b/example/cpp20_subscriber.cpp index d2ec462f..1f0dd86f 100644 --- a/example/cpp20_subscriber.cpp +++ b/example/cpp20_subscriber.cpp @@ -1,11 +1,10 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) */ #include -#include #include #include @@ -22,12 +21,8 @@ namespace asio = boost::asio; using namespace std::chrono_literals; using boost::redis::request; -using boost::redis::generic_response; -using boost::redis::consume_one; -using boost::redis::logger; +using boost::redis::generic_flat_response; using boost::redis::config; -using boost::redis::ignore; -using boost::redis::error; using boost::system::error_code; using boost::redis::connection; using asio::signal_set; @@ -54,30 +49,29 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + generic_flat_response resp; conn->set_receive_response(resp); // Loop while reconnection is enabled while (conn->will_reconnect()) { // Reconnect to the channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); - // Loop reading Redis pushs messages. + // Loop to read Redis push messages. for (error_code ec;;) { - // First tries to read any buffered pushes. - conn->receive(ec); - if (ec == error::sync_receive_push_failed) { - ec = {}; - co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec)); - } - + // Wait for pushes + co_await conn->async_receive2(asio::redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. - std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " - << resp.value().at(3).value << std::endl; + // The response must be consumed without suspending the + // coroutine i.e. without the use of async operations. + for (auto const& elem: resp.value().get_view()) + std::cout << elem.value << "\n"; - consume_one(resp); + std::cout << std::endl; + + resp.value().clear(); } } } diff --git a/include/boost/redis/adapter/detail/adapters.hpp b/include/boost/redis/adapter/detail/adapters.hpp index 818cf6c0..7830f0ed 100644 --- a/include/boost/redis/adapter/detail/adapters.hpp +++ b/include/boost/redis/adapter/detail/adapters.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -12,6 +12,8 @@ #include #include #include +#include +#include #include @@ -176,6 +178,97 @@ public: } }; +template <> +class general_aggregate { +private: + resp3::tree* tree_ = nullptr; + +public: + explicit general_aggregate(resp3::tree* c = nullptr) + : tree_(c) + { } + + void on_init() { } + void on_done() { } + + template + void on_node(resp3::basic_node const& nd, system::error_code&) + { + BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer"); + + resp3::node tmp; + tmp.data_type = nd.data_type; + tmp.aggregate_size = nd.aggregate_size; + tmp.depth = nd.depth; + tmp.value = std::string{std::cbegin(nd.value), std::cend(nd.value)}; + + tree_->push_back(std::move(tmp)); + } +}; + +template <> +class general_aggregate { +private: + generic_flat_response* tree_ = nullptr; + +public: + explicit general_aggregate(generic_flat_response* c = nullptr) + : tree_(c) + { } + + void on_init() { } + void on_done() + { + BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer"); + if (tree_->has_value()) { + tree_->value().notify_done(); + } + } + + template + void on_node(resp3::basic_node const& nd, system::error_code&) + { + BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer"); + switch (nd.data_type) { + case resp3::type::blob_error: + case resp3::type::simple_error: + *tree_ = error{ + nd.data_type, + std::string{std::cbegin(nd.value), std::cend(nd.value)} + }; + break; + default: + if (tree_->has_value()) { + (**tree_).push(nd); + } + } + } +}; + +template <> +class general_aggregate { +private: + resp3::flat_tree* tree_ = nullptr; + +public: + explicit general_aggregate(resp3::flat_tree* c = nullptr) + : tree_(c) + { } + + void on_init() { } + void on_done() + { + tree_->notify_done(); + } + + template + void on_node(resp3::basic_node const& nd, system::error_code&) + { + BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer"); + tree_->push(nd); + } +}; + template class general_simple { private: diff --git a/include/boost/redis/adapter/detail/response_traits.hpp b/include/boost/redis/adapter/detail/response_traits.hpp index f24dc891..d4d8a304 100644 --- a/include/boost/redis/adapter/detail/response_traits.hpp +++ b/include/boost/redis/adapter/detail/response_traits.hpp @@ -92,8 +92,32 @@ struct response_traits> { }; template -struct response_traits, Allocator>>> { - using response_type = result, Allocator>>; +struct response_traits>> { + using response_type = result>; + using adapter_type = general_aggregate; + + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +template +struct response_traits> { + using response_type = resp3::basic_tree; + using adapter_type = general_aggregate; + + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +template <> +struct response_traits { + using response_type = resp3::flat_tree; + using adapter_type = general_aggregate; + + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +template <> +struct response_traits { + using response_type = generic_flat_response; using adapter_type = general_aggregate; static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } diff --git a/include/boost/redis/adapter/detail/result_traits.hpp b/include/boost/redis/adapter/detail/result_traits.hpp index da625c99..83eddcc2 100644 --- a/include/boost/redis/adapter/detail/result_traits.hpp +++ b/include/boost/redis/adapter/detail/result_traits.hpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include @@ -56,12 +58,33 @@ struct result_traits>> { }; template -struct result_traits, Allocator>>> { +struct result_traits>> { using response_type = result, Allocator>>; using adapter_type = adapter::detail::general_aggregate; static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } }; +template +struct result_traits> { + using response_type = resp3::basic_tree; + using adapter_type = adapter::detail::general_aggregate; + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +template <> +struct result_traits { + using response_type = generic_flat_response; + using adapter_type = general_aggregate; + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +template <> +struct result_traits { + using response_type = resp3::flat_tree; + using adapter_type = general_aggregate; + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + template using adapter_t = typename result_traits>::adapter_type; diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index e83a8946..25791fea 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -207,6 +207,50 @@ struct connection_impl { { st_.mpx.set_receive_adapter(std::move(adapter)); } + + std::size_t receive(system::error_code& ec) + { + std::size_t size = 0; + + auto f = [&](system::error_code const& ec2, std::size_t n) { + ec = ec2; + size = n; + }; + + auto const res = receive_channel_.try_receive(f); + if (ec) + return 0; + + if (!res) + ec = error::sync_receive_push_failed; + + return size; + } + + template + auto async_receive2(CompletionToken&& token) + { + return + receive_channel_.async_receive( + asio::deferred( + [this](system::error_code ec, std::size_t) + { + if (!ec) { + auto f = [](system::error_code, std::size_t) { + // There is no point in checking for errors + // here since async_receive just completed + // without errors. + }; + + // We just want to drain the channel. + while (receive_channel_.try_receive(f)); + } + + return asio::deferred.values(ec); + } + ) + )(std::forward(token)); + } }; template @@ -593,7 +637,7 @@ public: return async_run(config{}, std::forward(token)); } - /** @brief Receives server side pushes asynchronously. + /** @brief (Deprecated) Receives server side pushes asynchronously. * * When pushes arrive and there is no `async_receive` operation in * progress, pushed data, requests, and responses will be paused @@ -623,12 +667,57 @@ public: * @param token Completion token. */ template > + BOOST_DEPRECATED("Please use async_receive2 instead.") auto async_receive(CompletionToken&& token = {}) { return impl_->receive_channel_.async_receive(std::forward(token)); } - /** @brief Receives server pushes synchronously without blocking. + /** @brief Wait for server pushes asynchronously + * + * This function suspends until a server push is received by the + * connection. On completion an unspecified number of pushes will + * have been added to the response object set with @ref + * boost::redis::connection::set_receive_response. + * + * To prevent receiving an unbound number of pushes the connection + * blocks further read operations on the socket when 256 pushes + * accumulate internally (we don't make any commitment to this + * exact number). When that happens any `async_exec`s and + * health-checks won't make any progress and the connection may + * eventually timeout. To avoid that Apps should call + * `async_receive2` continuously in a loop. + * + * @Note To avoid deadlocks the task (e.g. coroutine) calling + * `async_receive2` should not call `async_exec` in a way where + * they could block each other. + * + * For an example see cpp20_subscriber.cpp. The completion token + * must have the following signature + * + * @code + * void f(system::error_code); + * @endcode + * + * @par Per-operation cancellation + * This operation supports the following cancellation types: + * + * @li `asio::cancellation_type_t::terminal`. + * @li `asio::cancellation_type_t::partial`. + * @li `asio::cancellation_type_t::total`. + * + * Calling `basic_connection::cancel(operation::receive)` will + * also cancel any ongoing receive operations. + * + * @param token Completion token. + */ + template > + auto async_receive2(CompletionToken&& token = {}) + { + return impl_->async_receive2(std::forward(token)); + } + + /** @brief (Deprecated) Receives server pushes synchronously without blocking. * * Receives a server push synchronously by calling `try_receive` on * the underlying channel. If the operation fails because @@ -638,23 +727,10 @@ public: * @param ec Contains the error if any occurred. * @returns The number of bytes read from the socket. */ + BOOST_DEPRECATED("Please, use async_receive2 instead.") std::size_t receive(system::error_code& ec) { - std::size_t size = 0; - - auto f = [&](system::error_code const& ec2, std::size_t n) { - ec = ec2; - size = n; - }; - - auto const res = impl_->receive_channel_.try_receive(f); - if (ec) - return 0; - - if (!res) - ec = error::sync_receive_push_failed; - - return size; + return impl_->receive(ec); } /** @brief Executes commands on the Redis server asynchronously. @@ -837,7 +913,7 @@ public: "the other member functions to interact with the connection.") auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); } - /// Sets the response object of @ref async_receive operations. + /// Sets the response object of @ref async_receive2 operations. template void set_receive_response(Response& resp) { @@ -1028,13 +1104,25 @@ public: /// @copydoc basic_connection::async_receive template + BOOST_DEPRECATED("Please use async_receive2 instead.") auto async_receive(CompletionToken&& token = {}) { return impl_.async_receive(std::forward(token)); } + /// @copydoc basic_connection::async_receive2 + template + auto async_receive2(CompletionToken&& token = {}) + { + return impl_.async_receive2(std::forward(token)); + } + /// @copydoc basic_connection::receive - std::size_t receive(system::error_code& ec) { return impl_.receive(ec); } + BOOST_DEPRECATED("Please use async_receive2 instead.") + std::size_t receive(system::error_code& ec) + { + return impl_.impl_->receive(ec); + } /** * @brief Calls @ref boost::redis::basic_connection::async_exec. diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 506d0139..f39699f3 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -74,7 +74,7 @@ enum class error /// SSL handshake timeout ssl_handshake_timeout, - /// Can't receive push synchronously without blocking + /// (Deprecated) Can't receive push synchronously without blocking sync_receive_push_failed, /// Incompatible node depth. diff --git a/include/boost/redis/impl/flat_tree.ipp b/include/boost/redis/impl/flat_tree.ipp new file mode 100644 index 00000000..48be5040 --- /dev/null +++ b/include/boost/redis/impl/flat_tree.ipp @@ -0,0 +1,125 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Nikolai Vladimirov (nvladimirov.work@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +namespace boost::redis::resp3 { + +flat_tree::flat_tree(flat_tree const& other) +: data_{other.data_} +, view_tree_{other.view_tree_} +, ranges_{other.ranges_} +, pos_{0u} +, reallocs_{0u} +, total_msgs_{other.total_msgs_} +{ + view_tree_.resize(ranges_.size()); + set_views(); +} + +flat_tree& +flat_tree::operator=(flat_tree other) +{ + swap(*this, other); + return *this; +} + +void flat_tree::reserve(std::size_t bytes, std::size_t nodes) +{ + data_.reserve(bytes); + view_tree_.reserve(nodes); + ranges_.reserve(nodes); +} + +void flat_tree::clear() +{ + pos_ = 0u; + total_msgs_ = 0u; + reallocs_ = 0u; + data_.clear(); + view_tree_.clear(); + ranges_.clear(); +} + +void flat_tree::set_views() +{ + BOOST_ASSERT_MSG(pos_ < view_tree_.size(), "notify_done called but no nodes added."); + BOOST_ASSERT_MSG(view_tree_.size() == ranges_.size(), "Incompatible sizes."); + + for (; pos_ < view_tree_.size(); ++pos_) { + auto const& r = ranges_.at(pos_); + view_tree_.at(pos_).value = std::string_view{data_.data() + r.offset, r.size}; + } +} + +void flat_tree::notify_done() +{ + total_msgs_ += 1; + set_views(); +} + +void flat_tree::push(node_view const& node) +{ + auto data_before = data_.data(); + add_node_impl(node); + auto data_after = data_.data(); + + if (data_after != data_before) { + pos_ = 0; + reallocs_ += 1; + } +} + +void flat_tree::add_node_impl(node_view const& node) +{ + ranges_.push_back({data_.size(), node.value.size()}); + + // This must come after setting the offset above. + data_.append(node.value.data(), node.value.size()); + + view_tree_.push_back(node); +} + +void swap(flat_tree& a, flat_tree& b) +{ + using std::swap; + + swap(a.data_, b.data_); + swap(a.view_tree_, b.view_tree_); + swap(a.ranges_, b.ranges_); + swap(a.pos_, b.pos_); + swap(a.reallocs_, b.reallocs_); + swap(a.total_msgs_, b.total_msgs_); +} + +bool +operator==( + flat_tree::range const& a, + flat_tree::range const& b) +{ + return a.offset == b.offset && a.size == b.size; +} + +bool operator==(flat_tree const& a, flat_tree const& b) +{ + return + a.data_ == b.data_ && + a.view_tree_ == b.view_tree_ && + a.ranges_ == b.ranges_ && + a.pos_ == b.pos_ && + //a.reallocs_ == b.reallocs_ && + a.total_msgs_ == b.total_msgs_; +} + +bool operator!=(flat_tree const& a, flat_tree const& b) +{ + return !(a == b); +} + +} // namespace boost::redis diff --git a/include/boost/redis/resp3/flat_tree.hpp b/include/boost/redis/resp3/flat_tree.hpp new file mode 100644 index 00000000..90b23214 --- /dev/null +++ b/include/boost/redis/resp3/flat_tree.hpp @@ -0,0 +1,131 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Nikolai Vladimirov (nvladimirov.work@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_RESP3_FLAT_TREE_HPP +#define BOOST_REDIS_RESP3_FLAT_TREE_HPP + +#include +#include + +#include +#include + +namespace boost::redis { + +namespace adapter::detail { + template class general_aggregate; +} + +namespace resp3 { + +/** @brief A generic-response that stores data contiguously + * + * Similar to the @ref boost::redis::resp3::tree but data is + * stored contiguously. + */ +struct flat_tree { +public: + /// Default constructor + flat_tree() = default; + + /// Move constructor + flat_tree(flat_tree&&) noexcept = default; + + /// Copy constructor + flat_tree(flat_tree const& other); + + /// Copy assignment + flat_tree& operator=(flat_tree other); + + friend void swap(flat_tree&, flat_tree&); + + friend + bool operator==(flat_tree const&, flat_tree const&); + + friend + bool operator!=(flat_tree const&, flat_tree const&); + + /** @brief Reserve capacity + * + * Reserve memory for incoming data. + * + * @param bytes Number of bytes to reserve for data. + * @param nodes Number of nodes to reserve. + */ + void reserve(std::size_t bytes, std::size_t nodes); + + /** @brief Clear both the data and the node buffers + * + * @Note: A `boost::redis:.flat_tree` can contain the + * response to multiple Redis commands and server pushes. Calling + * this function will erase everything contained in it. + */ + void clear(); + + /// Returns the size of the data buffer + auto data_size() const noexcept -> std::size_t + { return data_.size(); } + + /// Returns the RESP3 response + auto get_view() const -> view_tree const& + { return view_tree_; } + + /** @brief Returns the number of times reallocation took place + * + * This function returns how many reallocations were performed and + * can be useful to determine how much memory to reserve upfront. + */ + auto get_reallocs() const noexcept -> std::size_t + { return reallocs_; } + + /// Returns the number of complete RESP3 messages contained in this object. + std::size_t get_total_msgs() const noexcept + { return total_msgs_; } + +private: + template friend class adapter::detail::general_aggregate; + + // Notify the object that all nodes were pushed. + void notify_done(); + + // Push a new node to the response + void push(node_view const& node); + + void add_node_impl(node_view const& node); + + void set_views(); + + // Range into the data buffer. + struct range { + std::size_t offset; + std::size_t size; + }; + + friend bool operator==(range const&, range const&); + + std::string data_; + view_tree view_tree_; + std::vector ranges_; + std::size_t pos_ = 0u; + std::size_t reallocs_ = 0u; + std::size_t total_msgs_ = 0u; +}; + +/// Swaps two responses +void swap(flat_tree&, flat_tree&); + +/// Equality operator +bool operator==(flat_tree const&, flat_tree const&); + +/// Inequality operator +bool operator!=(flat_tree const&, flat_tree const&); + +} // resp3 +} // namespace boost::redis + +#endif // BOOST_REDIS_RESP3_FLAT_TREE_HPP diff --git a/include/boost/redis/resp3/node.hpp b/include/boost/redis/resp3/node.hpp index 6e3f4aac..1e284dab 100644 --- a/include/boost/redis/resp3/node.hpp +++ b/include/boost/redis/resp3/node.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -53,6 +53,13 @@ auto operator==(basic_node const& a, basic_node const& b) // clang-format on }; +/// Inequality operator for RESP3 nodes +template +auto operator!=(basic_node const& a, basic_node const& b) +{ + return !(a == b); +}; + /// A node in the response tree that owns its data. using node = basic_node; diff --git a/include/boost/redis/resp3/tree.hpp b/include/boost/redis/resp3/tree.hpp new file mode 100644 index 00000000..fa4f4c66 --- /dev/null +++ b/include/boost/redis/resp3/tree.hpp @@ -0,0 +1,29 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_RESP3_TREE_HPP +#define BOOST_REDIS_RESP3_TREE_HPP + +#include + +#include +#include + +namespace boost::redis::resp3 { + +/// A RESP3 tree that owns its data. +template >> +using basic_tree = std::vector, Allocator>; + +/// A RESP3 tree that owns its data. +using tree = basic_tree; + +/// A RESP3 tree whose data are `std::string_views`. +using view_tree = basic_tree; + +} + +#endif // BOOST_REDIS_RESP3_RESPONSE_HPP diff --git a/include/boost/redis/response.hpp b/include/boost/redis/response.hpp index 94fdd63d..c39dc632 100644 --- a/include/boost/redis/response.hpp +++ b/include/boost/redis/response.hpp @@ -9,6 +9,8 @@ #include #include +#include +#include #include @@ -29,9 +31,12 @@ using response = std::tuple...>; * [pre-order](https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR) * view of the response tree. */ -using generic_response = adapter::result>; +using generic_response = adapter::result; -/** @brief Consume on response from a generic response +/// Similar to @ref boost::redis::generic_response but stores data contiguously. +using generic_flat_response = adapter::result; + +/** @brief (Deprecated) Consume on response from a generic response * * This function rotates the elements so that the start of the next * response becomes the new front element. For example the output of @@ -70,13 +75,15 @@ using generic_response = adapter::result>; * @param r The response to modify. * @param ec Will be populated in case of error. */ +//BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.") void consume_one(generic_response& r, system::error_code& ec); /** - * @brief Throwing overload of `consume_one`. + * @brief (Deprecated) Throwing overload of `consume_one`. * * @param r The response to modify. */ +//BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.") void consume_one(generic_response& r); } // namespace boost::redis diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 09334421..06c2dcd3 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 78a65ee1..4a9bf03c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -56,6 +56,7 @@ make_test(test_conn_run_cancel) make_test(test_conn_check_health) make_test(test_conn_exec) make_test(test_conn_push) +make_test(test_conn_push2) make_test(test_conn_monitor) make_test(test_conn_reconnect) make_test(test_conn_exec_cancel) diff --git a/test/test_any_adapter.cpp b/test/test_any_adapter.cpp index f0345ac1..ca123a49 100644 --- a/test/test_any_adapter.cpp +++ b/test/test_any_adapter.cpp @@ -13,6 +13,7 @@ #include using boost::redis::generic_response; +using boost::redis::resp3::flat_tree; using boost::redis::response; using boost::redis::ignore; using boost::redis::any_adapter; @@ -24,10 +25,12 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types) response r1; response r2; generic_response r3; + flat_tree r4; BOOST_CHECK_NO_THROW(any_adapter{r1}); BOOST_CHECK_NO_THROW(any_adapter{r2}); BOOST_CHECK_NO_THROW(any_adapter{r3}); + BOOST_CHECK_NO_THROW(any_adapter{r4}); BOOST_CHECK_NO_THROW(any_adapter{ignore}); } diff --git a/test/test_conn_echo_stress.cpp b/test/test_conn_echo_stress.cpp index 5b37489d..48157b0b 100644 --- a/test/test_conn_echo_stress.cpp +++ b/test/test_conn_echo_stress.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -27,6 +27,7 @@ using error_code = boost::system::error_code; using boost::redis::operation; using boost::redis::request; using boost::redis::response; +using boost::redis::resp3::flat_tree; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::logger; @@ -54,39 +55,44 @@ std::ostream& operator<<(std::ostream& os, usage const& u) namespace { -auto push_consumer(connection& conn, int expected) -> net::awaitable +auto +receiver( + connection& conn, + flat_tree& resp, + std::size_t expected) -> net::awaitable { - int c = 0; - for (error_code ec;;) { - conn.receive(ec); - if (ec == error::sync_receive_push_failed) { - ec = {}; - co_await conn.async_receive(net::redirect_error(ec)); - } else if (!ec) { - //std::cout << "Skipping suspension." << std::endl; - } - - if (ec) { - BOOST_TEST(false, "push_consumer error: " << ec.message()); - co_return; - } - if (++c == expected) - break; + std::size_t push_counter = 0; + while (push_counter != expected) { + co_await conn.async_receive2(); + push_counter += resp.get_total_msgs(); + resp.clear(); } conn.cancel(); } -auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable +auto echo_session(connection& conn, const request& req, std::size_t n) -> net::awaitable { - for (auto i = 0; i < n; ++i) - co_await conn.async_exec(pubs); + for (auto i = 0u; i < n; ++i) + co_await conn.async_exec(req); } void rethrow_on_error(std::exception_ptr exc) { - if (exc) + if (exc) { + BOOST_TEST(false); std::rethrow_exception(exc); + } +} + +request make_pub_req(std::size_t n_pubs) +{ + request req; + req.push("PING"); + for (std::size_t i = 0u; i < n_pubs; ++i) + req.push("PUBLISH", "channel", "payload"); + + return req; } BOOST_AUTO_TEST_CASE(echo_stress) @@ -98,22 +104,22 @@ BOOST_AUTO_TEST_CASE(echo_stress) // Number of coroutines that will send pings sharing the same // connection to redis. - constexpr int sessions = 150; + constexpr std::size_t sessions = 150u; // The number of pings that will be sent by each session. - constexpr int msgs = 200; + constexpr std::size_t msgs = 200u; // The number of publishes that will be sent by each session with // each message. - constexpr int n_pubs = 25; + constexpr std::size_t n_pubs = 25u; // This is the total number of pushes we will receive. - constexpr int total_pushes = sessions * msgs * n_pubs + 1; + constexpr std::size_t total_pushes = sessions * msgs * n_pubs + 1; - request pubs; - pubs.push("PING"); - for (int i = 0; i < n_pubs; ++i) - pubs.push("PUBLISH", "channel", "payload"); + flat_tree resp; + conn.set_receive_response(resp); + + request const pub_req = make_pub_req(n_pubs); // Run the connection bool run_finished = false, subscribe_finished = false; @@ -123,6 +129,10 @@ BOOST_AUTO_TEST_CASE(echo_stress) std::clog << "async_run finished" << std::endl; }); + // Op that will consume the pushes counting down until all expected + // pushes have been received. + net::co_spawn(ctx, receiver(conn, resp, total_pushes), rethrow_on_error); + // Subscribe, then launch the coroutines request req; req.push("SUBSCRIBE", "channel"); @@ -130,12 +140,8 @@ BOOST_AUTO_TEST_CASE(echo_stress) subscribe_finished = true; BOOST_TEST(ec == error_code()); - // Op that will consume the pushes counting down until all expected - // pushes have been received. - net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error); - - for (int i = 0; i < sessions; ++i) - net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error); + for (std::size_t i = 0; i < sessions; ++i) + net::co_spawn(ctx, echo_session(conn, pub_req, msgs), rethrow_on_error); }); // Run the test @@ -144,7 +150,13 @@ BOOST_AUTO_TEST_CASE(echo_stress) BOOST_TEST(subscribe_finished); // Print statistics - std::cout << "-------------------\n" << conn.get_usage() << std::endl; + std::cout + << "-------------------\n" + << "Usage data: \n" + << conn.get_usage() << "\n" + << "-------------------\n" + << "Reallocations: " << resp.get_reallocs() + << std::endl; } } // namespace diff --git a/test/test_conn_exec_error.cpp b/test/test_conn_exec_error.cpp index 5a121929..b77f1299 100644 --- a/test/test_conn_exec_error.cpp +++ b/test/test_conn_exec_error.cpp @@ -269,9 +269,9 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) generic_response gresp; conn->set_receive_response(gresp); - auto c3 = [&](error_code ec, std::size_t) { + auto c3 = [&](error_code ec) { c3_called = true; - std::cout << "async_receive" << std::endl; + std::cout << "async_receive2" << std::endl; BOOST_TEST(!ec); BOOST_TEST(gresp.has_error()); BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error); @@ -281,7 +281,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) conn->cancel(operation::reconnection); }; - conn->async_receive(c3); + conn->async_receive2(c3); run(conn); @@ -326,4 +326,4 @@ BOOST_AUTO_TEST_CASE(issue_287_generic_response_error_then_success) BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command"); } -} // namespace \ No newline at end of file +} // namespace diff --git a/test/test_conn_monitor.cpp b/test/test_conn_monitor.cpp index 24d22ee1..090c5455 100644 --- a/test/test_conn_monitor.cpp +++ b/test/test_conn_monitor.cpp @@ -41,7 +41,7 @@ class test_monitor { void start_receive() { - conn.async_receive([this](error_code ec, std::size_t) { + conn.async_receive2([this](error_code ec) { // We should expect one push entry, at least BOOST_TEST_EQ(ec, error_code()); BOOST_TEST(monitor_resp.has_value()); @@ -118,4 +118,4 @@ int main() test_monitor{}.run(); return boost::report_errors(); -} \ No newline at end of file +} diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp new file mode 100644 index 00000000..9e3d7f62 --- /dev/null +++ b/test/test_conn_push2.cpp @@ -0,0 +1,392 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include +#include +#include + +#include +#include + +#include + +#define BOOST_TEST_MODULE conn_push +#include + +#include "common.hpp" + +#include +#include + +namespace net = boost::asio; +namespace redis = boost::redis; + +using boost::redis::operation; +using boost::redis::connection; +using boost::system::error_code; +using boost::redis::request; +using boost::redis::response; +using boost::redis::resp3::flat_tree; +using boost::redis::ignore; +using boost::redis::ignore_t; +using boost::system::error_code; +using boost::redis::logger; +using namespace std::chrono_literals; + +namespace { + +BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) +{ + request req1; + req1.push("HELLO", 3); + req1.push("PING", "Message1"); + + request req2; + req2.push("SUBSCRIBE", "channel"); + + request req3; + req3.push("PING", "Message2"); + req3.push("QUIT"); + + net::io_context ioc; + + auto conn = std::make_shared(ioc); + + bool push_received = false, c1_called = false, c2_called = false, c3_called = false; + + auto c3 = [&](error_code ec, std::size_t) { + c3_called = true; + std::cout << "c3: " << ec.message() << std::endl; + }; + + auto c2 = [&, conn](error_code ec, std::size_t) { + c2_called = true; + BOOST_TEST(ec == error_code()); + conn->async_exec(req3, ignore, c3); + }; + + auto c1 = [&, conn](error_code ec, std::size_t) { + c1_called = true; + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c2); + }; + + conn->async_exec(req1, ignore, c1); + + run(conn, make_test_config(), {}); + + conn->async_receive2([&, conn](error_code ec) { + std::cout << "async_receive2" << std::endl; + BOOST_TEST(ec == error_code()); + push_received = true; + conn->cancel(); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(push_received); + BOOST_TEST(c1_called); + BOOST_TEST(c2_called); + BOOST_TEST(c3_called); +} + +BOOST_AUTO_TEST_CASE(push_received1) +{ + net::io_context ioc; + auto conn = std::make_shared(ioc); + + flat_tree resp; + conn->set_receive_response(resp); + + // Trick: Uses SUBSCRIBE because this command has no response or + // better said, its response is a server push, which is what we + // want to test. + request req; + req.push("SUBSCRIBE", "channel1"); + req.push("SUBSCRIBE", "channel2"); + + bool push_received = false, exec_finished = false; + + conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) { + exec_finished = true; + std::cout << "async_exec" << std::endl; + BOOST_TEST(ec == error_code()); + }); + + conn->async_receive2([&, conn](error_code ec) { + push_received = true; + std::cout << "async_receive2" << std::endl; + + BOOST_TEST(ec == error_code()); + + BOOST_CHECK_EQUAL(resp.get_total_msgs(), 2u); + + conn->cancel(); + }); + + run(conn); + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(push_received); +} + +BOOST_AUTO_TEST_CASE(push_filtered_out) +{ + net::io_context ioc; + auto conn = std::make_shared(ioc); + + request req; + req.push("HELLO", 3); + req.push("PING"); + req.push("SUBSCRIBE", "channel"); + req.push("QUIT"); + + response resp; + + bool exec_finished = false, push_received = false; + + conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) { + exec_finished = true; + BOOST_TEST(ec == error_code()); + }); + + conn->async_receive2([&, conn](error_code ec) { + push_received = true; + BOOST_TEST(ec == error_code()); + conn->cancel(operation::reconnection); + }); + + run(conn); + + ioc.run_for(test_timeout); + BOOST_TEST(exec_finished); + BOOST_TEST(push_received); + + BOOST_CHECK_EQUAL(std::get<1>(resp).value(), "PONG"); + BOOST_CHECK_EQUAL(std::get<2>(resp).value(), "OK"); +} + +struct response_error_tag { }; +response_error_tag error_tag_obj; + +struct response_error_adapter { + void on_init() { } + void on_done() { } + + void on_node( + boost::redis::resp3::basic_node const&, + boost::system::error_code& ec) + { + ec = boost::redis::error::incompatible_size; + } +}; + +auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } + +BOOST_AUTO_TEST_CASE(test_push_adapter) +{ + net::io_context ioc; + auto conn = std::make_shared(ioc); + + request req; + req.push("HELLO", 3); + req.push("PING"); + req.push("SUBSCRIBE", "channel"); + req.push("PING"); + + conn->set_receive_response(error_tag_obj); + + bool push_received = false, exec_finished = false, run_finished = false; + + conn->async_receive2([&, conn](error_code ec) { + BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled); + push_received = true; + }); + + conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { + BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + exec_finished = true; + }); + + auto cfg = make_test_config(); + cfg.reconnect_wait_interval = 0s; + conn->async_run(cfg, [&run_finished](error_code ec) { + BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size); + run_finished = true; + }); + + ioc.run_for(test_timeout); + BOOST_TEST(push_received); + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); + + // TODO: Reset the ioc reconnect and send a quit to ensure + // reconnection is possible after an error. +} + +void launch_push_consumer(std::shared_ptr conn) +{ + conn->async_receive2([conn](error_code ec) { + if (ec) { + BOOST_TEST(ec == net::experimental::error::channel_cancelled); + return; + } + launch_push_consumer(conn); + }); +} + +BOOST_AUTO_TEST_CASE(many_subscribers) +{ + request req0; + req0.get_config().cancel_on_connection_lost = false; + req0.push("HELLO", 3); + + request req1; + req1.get_config().cancel_on_connection_lost = false; + req1.push("PING", "Message1"); + + request req2; + req2.get_config().cancel_on_connection_lost = false; + req2.push("SUBSCRIBE", "channel"); + + request req3; + req3.get_config().cancel_on_connection_lost = false; + req3.push("QUIT"); + + net::io_context ioc; + auto conn = std::make_shared(ioc); + + bool finished = false; + + auto c11 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->cancel(operation::reconnection); + finished = true; + }; + auto c10 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req3, ignore, c11); + }; + auto c9 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c10); + }; + auto c8 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c9); + }; + auto c7 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c8); + }; + auto c6 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c7); + }; + auto c5 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c6); + }; + auto c4 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c5); + }; + auto c3 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c4); + }; + auto c2 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c3); + }; + auto c1 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c2); + }; + auto c0 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c1); + }; + + conn->async_exec(req0, ignore, c0); + launch_push_consumer(conn); + + run(conn, make_test_config(), {}); + + ioc.run_for(test_timeout); + BOOST_TEST(finished); +} + +BOOST_AUTO_TEST_CASE(test_unsubscribe) +{ + net::io_context ioc; + connection conn{ioc}; + + // Subscribe to 3 channels and 2 patterns. Use CLIENT INFO to verify this took effect + request req_subscribe; + req_subscribe.push("SUBSCRIBE", "ch1", "ch2", "ch3"); + req_subscribe.push("PSUBSCRIBE", "ch1*", "ch2*"); + req_subscribe.push("CLIENT", "INFO"); + + // Then, unsubscribe from some of them, and verify again + request req_unsubscribe; + req_unsubscribe.push("UNSUBSCRIBE", "ch1"); + req_unsubscribe.push("PUNSUBSCRIBE", "ch2*"); + req_unsubscribe.push("CLIENT", "INFO"); + + // Finally, ping to verify that the connection is still usable + request req_ping; + req_ping.push("PING", "test_unsubscribe"); + + response resp_subscribe, resp_unsubscribe, resp_ping; + + bool subscribe_finished = false, unsubscribe_finished = false, ping_finished = false, + run_finished = false; + + auto on_ping = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + ping_finished = true; + BOOST_TEST(std::get<0>(resp_ping).has_value()); + BOOST_TEST(std::get<0>(resp_ping).value() == "test_unsubscribe"); + conn.cancel(); + }; + + auto on_unsubscribe = [&](error_code ec, std::size_t) { + unsubscribe_finished = true; + BOOST_TEST(ec == error_code()); + BOOST_TEST(std::get<0>(resp_unsubscribe).has_value()); + BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub") == "2"); + BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub") == "1"); + conn.async_exec(req_ping, resp_ping, on_ping); + }; + + auto on_subscribe = [&](error_code ec, std::size_t) { + subscribe_finished = true; + BOOST_TEST(ec == error_code()); + BOOST_TEST(std::get<0>(resp_subscribe).has_value()); + BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "sub") == "3"); + BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "psub") == "2"); + conn.async_exec(req_unsubscribe, resp_unsubscribe, on_unsubscribe); + }; + + conn.async_exec(req_subscribe, resp_subscribe, on_subscribe); + + conn.async_run(make_test_config(), [&run_finished](error_code ec) { + BOOST_TEST(ec == net::error::operation_aborted); + run_finished = true; + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(subscribe_finished); + BOOST_TEST(unsubscribe_finished); + BOOST_TEST(ping_finished); + BOOST_TEST(run_finished); +} + +} // namespace diff --git a/test/test_issue_50.cpp b/test/test_issue_50.cpp index 98a4c76b..00c04cad 100644 --- a/test/test_issue_50.cpp +++ b/test/test_issue_50.cpp @@ -42,16 +42,15 @@ namespace { // Push consumer auto receiver(std::shared_ptr conn) -> net::awaitable { - std::cout << "uuu" << std::endl; + std::cout << "Entering receiver" << std::endl; while (conn->will_reconnect()) { - std::cout << "dddd" << std::endl; - // Loop reading Redis pushs messages. - for (;;) { - std::cout << "aaaa" << std::endl; - error_code ec; - co_await conn->async_receive(net::redirect_error(ec)); + std::cout << "Reconnect loop" << std::endl; + // Loop reading Redis pushes. + for (error_code ec;;) { + std::cout << "Receive loop" << std::endl; + co_await conn->async_receive2(net::redirect_error(ec)); if (ec) { - std::cout << "Error in async_receive" << std::endl; + std::cout << "Error in async_receive2" << std::endl; break; } } diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 5ec3e92f..246cc0f3 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -22,15 +22,20 @@ using boost::redis::request; using boost::redis::adapter::adapt2; using boost::redis::adapter::result; -using boost::redis::generic_response; +using boost::redis::resp3::tree; +using boost::redis::resp3::flat_tree; +using boost::redis::generic_flat_response; using boost::redis::ignore_t; using boost::redis::resp3::detail::deserialize; using boost::redis::resp3::node; +using boost::redis::resp3::node_view; using boost::redis::resp3::to_string; using boost::redis::response; using boost::redis::any_adapter; using boost::system::error_code; +namespace resp3 = boost::redis::resp3; + #define RESP3_SET_PART1 "~6\r\n+orange\r" #define RESP3_SET_PART2 "\n+apple\r\n+one" #define RESP3_SET_PART3 "\r\n+two\r" @@ -42,7 +47,9 @@ BOOST_AUTO_TEST_CASE(low_level_sync_sans_io) try { result> resp; - deserialize(resp3_set, adapt2(resp)); + error_code ec; + deserialize(resp3_set, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); for (auto const& e : resp.value()) std::cout << e << std::endl; @@ -65,7 +72,9 @@ BOOST_AUTO_TEST_CASE(issue_210_empty_set) char const* wire = "*4\r\n:1\r\n~0\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK(std::get<1>(resp.value()).value().empty()); @@ -91,7 +100,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_one) char const* wire = "*4\r\n:1\r\n~1\r\n$3\r\nfoo\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().size(), 1u); @@ -118,7 +129,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_two) char const* wire = "*4\r\n:1\r\n~2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().at(0), std::string{"foo"}); @@ -140,7 +153,9 @@ BOOST_AUTO_TEST_CASE(issue_210_no_nested) char const* wire = "*4\r\n:1\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value(), std::string{"foo"}); @@ -159,7 +174,10 @@ BOOST_AUTO_TEST_CASE(issue_233_array_with_null) result>> resp; char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n"; - deserialize(wire, adapt2(resp)); + + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(resp.value().at(0).value(), "one"); BOOST_TEST(!resp.value().at(1).has_value()); @@ -177,7 +195,10 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null) result>>> resp; char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n"; - deserialize(wire, adapt2(resp)); + + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(resp.value().value().at(0).value(), "one"); BOOST_TEST(!resp.value().value().at(1).has_value()); @@ -313,3 +334,172 @@ BOOST_AUTO_TEST_CASE(check_counter_adapter) BOOST_CHECK_EQUAL(node, 7); BOOST_CHECK_EQUAL(done, 1); } + +namespace boost::redis::resp3 { + +template +std::ostream& operator<<(std::ostream& os, basic_node const& nd) +{ + os << "type: " << to_string(nd.data_type) << "\n" + << "aggregate_size: " << nd.aggregate_size << "\n" + << "depth: " << nd.depth << "\n" + << "value: " << nd.value << "\n"; + return os; +} + +template +std::ostream& operator<<(std::ostream& os, basic_tree const& resp) +{ + for (auto const& e: resp) + os << e << ","; + return os; +} + +} + +node from_node_view(node_view const& v) +{ + node ret; + ret.data_type = v.data_type; + ret.aggregate_size = v.aggregate_size; + ret.depth = v.depth; + ret.value = v.value; + return ret; +} + +tree from_flat(flat_tree const& resp) +{ + tree ret; + for (auto const& e: resp.get_view()) + ret.push_back(from_node_view(e)); + + return ret; +} + +tree from_flat(generic_flat_response const& resp) +{ + tree ret; + for (auto const& e: resp.value().get_view()) + ret.push_back(from_node_view(e)); + + return ret; +} + + +// Parses the same data into a tree and a +// flat_tree, they should be equal to each other. +BOOST_AUTO_TEST_CASE(flat_tree_views_are_set) +{ + tree resp1; + flat_tree resp2; + generic_flat_response resp3; + + error_code ec; + deserialize(resp3_set, adapt2(resp1), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + deserialize(resp3_set, adapt2(resp2), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + deserialize(resp3_set, adapt2(resp3), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + BOOST_CHECK_EQUAL(resp2.get_reallocs(), 1u); + BOOST_CHECK_EQUAL(resp2.get_total_msgs(), 1u); + + BOOST_CHECK_EQUAL(resp3.value().get_reallocs(), 1u); + BOOST_CHECK_EQUAL(resp3.value().get_total_msgs(), 1u); + + auto const tmp2 = from_flat(resp2); + BOOST_CHECK_EQUAL(resp1, tmp2); + + auto const tmp3 = from_flat(resp3); + BOOST_CHECK_EQUAL(resp1, tmp3); +} + +// The response should be reusable. +BOOST_AUTO_TEST_CASE(flat_tree_reuse) +{ + flat_tree tmp; + + // First use + error_code ec; + deserialize(resp3_set, adapt2(tmp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + BOOST_CHECK_EQUAL(tmp.get_reallocs(), 1u); + BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u); + + // Copy to compare after the reuse. + auto const resp1 = tmp.get_view(); + tmp.clear(); + + // Second use + deserialize(resp3_set, adapt2(tmp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + // No reallocation this time + BOOST_CHECK_EQUAL(tmp.get_reallocs(), 0u); + BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u); + + BOOST_CHECK_EQUAL(resp1, tmp.get_view()); +} + +BOOST_AUTO_TEST_CASE(flat_tree_copy_assign) +{ + flat_tree resp; + + error_code ec; + deserialize(resp3_set, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + // Copy + resp3::flat_tree copy1{resp}; + + // Copy assignment + resp3::flat_tree copy2 = resp; + + // Assignment + resp3::flat_tree copy3; + copy3 = resp; + + BOOST_TEST((copy1 == resp)); + BOOST_TEST((copy2 == resp)); + BOOST_TEST((copy3 == resp)); +} + +BOOST_AUTO_TEST_CASE(generic_flat_response_simple_error) +{ + generic_flat_response resp; + + char const* wire = "-Error\r\n"; + + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + BOOST_TEST(!resp.has_value()); + BOOST_TEST(resp.has_error()); + auto const error = resp.error(); + + BOOST_CHECK_EQUAL(error.data_type, boost::redis::resp3::type::simple_error); + BOOST_CHECK_EQUAL(error.diagnostic, std::string{"Error"}); +} + +BOOST_AUTO_TEST_CASE(generic_flat_response_blob_error) +{ + generic_flat_response resp; + + char const* wire = "!5\r\nError\r\n"; + + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + BOOST_TEST(!resp.has_value()); + BOOST_TEST(resp.has_error()); + auto const error = resp.error(); + + BOOST_CHECK_EQUAL(error.data_type, boost::redis::resp3::type::blob_error); + BOOST_CHECK_EQUAL(error.diagnostic, std::string{"Error"}); +}