diff --git a/README.md b/README.md index 9b2db656..af0b575b 100644 --- a/README.md +++ b/README.md @@ -87,9 +87,9 @@ them are: * [Client-side caching](https://redis.io/docs/manual/client-side-caching/). The connection class supports server pushes by means of the -`connection::async_receive` function, which can be +`connection::async_receive2` function, which can be called in the same connection that is being used to execute commands. -The coroutine below shows how to use it: +The coroutine below shows how to use it ```cpp @@ -99,26 +99,25 @@ receiver(std::shared_ptr conn) -> net::awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + flat_tree resp; conn->set_receive_response(resp); // Loop while reconnection is enabled while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); // Loop reading Redis pushes. - for (;;) { - error_code ec; - co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec)); + for (error_code ec;;) { + co_await conn->async_receive2(resp, redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. // Use the response resp in some way and then clear it. ... - consume_one(resp); + resp.clear(); } } } @@ -126,4 +125,4 @@ receiver(std::shared_ptr conn) -> net::awaitable ## Further reading -Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html). \ No newline at end of file +Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html). diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index fe9f04f1..36b83d49 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -97,9 +97,9 @@ them are: * https://redis.io/docs/manual/client-side-caching/[Client-side caching]. The connection class supports server pushes by means of the -xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive`] function, which can be +xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive2`] function, which can be called in the same connection that is being used to execute commands. -The coroutine below shows how to use it: +The coroutine below shows how to use it [source,cpp] @@ -110,26 +110,25 @@ receiver(std::shared_ptr conn) -> net::awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_response resp; + flat_tree resp; conn->set_receive_response(resp); // Loop while reconnection is enabled while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); // Loop reading Redis pushes. - for (;;) { - error_code ec; - co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec)); + for (error_code ec;;) { + co_await conn->async_receive2(resp, redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. - // Use the response resp in some way and then clear it. + // Use the response here and then clear it. ... - consume_one(resp); + resp.clear(); } } } diff --git a/example/cpp20_chat_room.cpp b/example/cpp20_chat_room.cpp index 4a820d52..89ae7869 100644 --- a/example/cpp20_chat_room.cpp +++ b/example/cpp20_chat_room.cpp @@ -30,11 +30,9 @@ using boost::asio::consign; using boost::asio::detached; using boost::asio::dynamic_buffer; using boost::asio::redirect_error; -using boost::asio::use_awaitable; using boost::redis::config; using boost::redis::connection; -using boost::redis::generic_flat_response; -using boost::redis::ignore; +using boost::redis::resp3::flat_tree; using boost::redis::request; using boost::system::error_code; using namespace std::chrono_literals; @@ -47,21 +45,25 @@ auto receiver(std::shared_ptr conn) -> awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_flat_response resp; + flat_tree resp; conn->set_receive_response(resp); while (conn->will_reconnect()) { // Subscribe to channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); // Loop reading Redis push messages. for (error_code ec;;) { - co_await conn->async_receive(redirect_error(use_awaitable, ec)); + co_await conn->async_receive2(redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. - std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " - << resp.value().at(3).value << std::endl; - resp.value().clear(); + + for (auto const& elem: resp.get_view()) + std::cout << elem.value << "\n"; + + std::cout << std::endl; + + resp.clear(); } } } @@ -74,7 +76,7 @@ auto publisher(std::shared_ptr in, std::shared_ptrasync_exec(req, ignore); + co_await conn->async_exec(req); msg.erase(0, n); } } diff --git a/example/cpp20_streams.cpp b/example/cpp20_streams.cpp index 14c14087..1cef143f 100644 --- a/example/cpp20_streams.cpp +++ b/example/cpp20_streams.cpp @@ -23,7 +23,7 @@ namespace net = boost::asio; using boost::redis::config; -using boost::redis::generic_flat_response; +using boost::redis::generic_response; using boost::redis::operation; using boost::redis::request; using boost::redis::connection; @@ -33,7 +33,7 @@ auto stream_reader(std::shared_ptr conn) -> net::awaitable { std::string redisStreamKey_; request req; - generic_flat_response resp; + generic_response resp; std::string stream_id{"$"}; std::string const field = "myfield"; @@ -51,7 +51,7 @@ auto stream_reader(std::shared_ptr conn) -> net::awaitable // The following approach was taken in order to be able to // deal with the responses, as generated by redis in the case // that there are multiple stream 'records' within a single - // generic_flat_response. The nesting and number of values in + // generic_response. The nesting and number of values in // resp.value() are different, depending on the contents // of the stream in redis. Uncomment the above commented-out // code for examples while running the XADD command. diff --git a/example/cpp20_subscriber.cpp b/example/cpp20_subscriber.cpp index 40017835..531fdaf9 100644 --- a/example/cpp20_subscriber.cpp +++ b/example/cpp20_subscriber.cpp @@ -1,11 +1,10 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) */ #include -#include #include #include @@ -22,12 +21,8 @@ namespace asio = boost::asio; using namespace std::chrono_literals; using boost::redis::request; -using boost::redis::generic_flat_response; -using boost::redis::consume_one; -using boost::redis::logger; +using boost::redis::resp3::flat_tree; using boost::redis::config; -using boost::redis::ignore; -using boost::redis::error; using boost::system::error_code; using boost::redis::connection; using asio::signal_set; @@ -54,30 +49,29 @@ auto receiver(std::shared_ptr conn) -> asio::awaitable request req; req.push("SUBSCRIBE", "channel"); - generic_flat_response resp; + flat_tree resp; conn->set_receive_response(resp); // Loop while reconnection is enabled while (conn->will_reconnect()) { // Reconnect to the channels. - co_await conn->async_exec(req, ignore); + co_await conn->async_exec(req); - // Loop reading Redis pushs messages. + // Loop to read Redis push messages. for (error_code ec;;) { - // First tries to read any buffered pushes. - conn->receive(ec); - if (ec == error::sync_receive_push_failed) { - ec = {}; - co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec)); - } - + // Wait for pushes + co_await conn->async_receive2(asio::redirect_error(ec)); if (ec) break; // Connection lost, break so we can reconnect to channels. - std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " - << resp.value().at(3).value << std::endl; + // The response must be consumed without suspending the + // coroutine i.e. without the use of async operations. + for (auto const& elem: resp.get_view()) + std::cout << elem.value << "\n"; - consume_one(resp); + std::cout << std::endl; + + resp.clear(); } } } diff --git a/include/boost/redis/adapter/any_adapter.hpp b/include/boost/redis/adapter/any_adapter.hpp index 0311a28d..a32e2545 100644 --- a/include/boost/redis/adapter/any_adapter.hpp +++ b/include/boost/redis/adapter/any_adapter.hpp @@ -53,7 +53,6 @@ public: static auto create_impl(T& resp) -> impl_t { using namespace boost::redis::adapter; - return [adapter2 = boost_redis_adapt(resp)]( any_adapter::parse_event ev, resp3::node_view const& nd, diff --git a/include/boost/redis/adapter/detail/adapters.hpp b/include/boost/redis/adapter/detail/adapters.hpp index 9c485430..432fa9bd 100644 --- a/include/boost/redis/adapter/detail/adapters.hpp +++ b/include/boost/redis/adapter/detail/adapters.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -138,6 +137,8 @@ void boost_redis_from_bulk(T& t, resp3::basic_node const& node, system:: from_bulk_impl::apply(t, node, ec); } +//================================================ + template class general_aggregate { private: @@ -177,37 +178,54 @@ public: }; template <> -class general_aggregate> { +class general_aggregate { private: - result* result_; + resp3::tree* tree_ = nullptr; public: - explicit general_aggregate(result* c = nullptr) - : result_(c) + explicit general_aggregate(resp3::tree* c = nullptr) + : tree_(c) + { } + + void on_init() { } + void on_done() { } + + template + void on_node(resp3::basic_node const& nd, system::error_code&) + { + BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer"); + + resp3::node tmp; + tmp.data_type = nd.data_type; + tmp.aggregate_size = nd.aggregate_size; + tmp.depth = nd.depth; + tmp.value = std::string{std::cbegin(nd.value), std::cend(nd.value)}; + + tree_->push_back(std::move(tmp)); + } +}; + +template <> +class general_aggregate { +private: + resp3::flat_tree* tree_ = nullptr; + +public: + explicit general_aggregate(resp3::flat_tree* c = nullptr) + : tree_(c) { } void on_init() { } void on_done() { - if (result_->has_value()) { - result_->value().set_view(); - } + tree_->notify_done(); } template - void operator()(resp3::basic_node const& nd, system::error_code&) + void on_node(resp3::basic_node const& nd, system::error_code&) { - BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer"); - switch (nd.data_type) { - case resp3::type::blob_error: - case resp3::type::simple_error: - *result_ = error{ - nd.data_type, - std::string{std::cbegin(nd.value), std::cend(nd.value)} - }; - break; - default: result_->value().add_node(nd); - } + BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer"); + tree_->push(nd); } }; diff --git a/include/boost/redis/adapter/detail/response_traits.hpp b/include/boost/redis/adapter/detail/response_traits.hpp index 1ab2c589..49385812 100644 --- a/include/boost/redis/adapter/detail/response_traits.hpp +++ b/include/boost/redis/adapter/detail/response_traits.hpp @@ -92,8 +92,24 @@ struct response_traits> { }; template -struct response_traits, Allocator>>> { - using response_type = result, Allocator>>; +struct response_traits>> { + using response_type = result>; + using adapter_type = general_aggregate; + + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +template +struct response_traits> { + using response_type = resp3::basic_tree; + using adapter_type = general_aggregate; + + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + +template <> +struct response_traits { + using response_type = resp3::flat_tree; using adapter_type = general_aggregate; static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } @@ -107,13 +123,6 @@ struct response_traits> { static auto adapt(response_type& r) noexcept { return adapter_type{r}; } }; -template <> -struct response_traits { - using response_type = generic_flat_response; - using adapter_type = vector_adapter; - - static auto adapt(response_type& v) noexcept { return adapter_type{v}; } -}; } // namespace boost::redis::adapter::detail #endif // BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP diff --git a/include/boost/redis/adapter/detail/result_traits.hpp b/include/boost/redis/adapter/detail/result_traits.hpp index 2e158a39..67131819 100644 --- a/include/boost/redis/adapter/detail/result_traits.hpp +++ b/include/boost/redis/adapter/detail/result_traits.hpp @@ -13,7 +13,8 @@ #include #include #include -#include +#include +#include #include @@ -57,19 +58,26 @@ struct result_traits>> { }; template -struct result_traits, Allocator>>> { +struct result_traits>> { using response_type = result, Allocator>>; using adapter_type = adapter::detail::general_aggregate; static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } }; -template <> -struct result_traits { - using response_type = generic_flat_response; +template +struct result_traits> { + using response_type = resp3::basic_tree; using adapter_type = adapter::detail::general_aggregate; static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } }; +template <> +struct result_traits { + using response_type = resp3::flat_tree; + using adapter_type = general_aggregate; + static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } +}; + template using adapter_t = typename result_traits>::adapter_type; diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index e83a8946..25791fea 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -207,6 +207,50 @@ struct connection_impl { { st_.mpx.set_receive_adapter(std::move(adapter)); } + + std::size_t receive(system::error_code& ec) + { + std::size_t size = 0; + + auto f = [&](system::error_code const& ec2, std::size_t n) { + ec = ec2; + size = n; + }; + + auto const res = receive_channel_.try_receive(f); + if (ec) + return 0; + + if (!res) + ec = error::sync_receive_push_failed; + + return size; + } + + template + auto async_receive2(CompletionToken&& token) + { + return + receive_channel_.async_receive( + asio::deferred( + [this](system::error_code ec, std::size_t) + { + if (!ec) { + auto f = [](system::error_code, std::size_t) { + // There is no point in checking for errors + // here since async_receive just completed + // without errors. + }; + + // We just want to drain the channel. + while (receive_channel_.try_receive(f)); + } + + return asio::deferred.values(ec); + } + ) + )(std::forward(token)); + } }; template @@ -593,7 +637,7 @@ public: return async_run(config{}, std::forward(token)); } - /** @brief Receives server side pushes asynchronously. + /** @brief (Deprecated) Receives server side pushes asynchronously. * * When pushes arrive and there is no `async_receive` operation in * progress, pushed data, requests, and responses will be paused @@ -623,12 +667,57 @@ public: * @param token Completion token. */ template > + BOOST_DEPRECATED("Please use async_receive2 instead.") auto async_receive(CompletionToken&& token = {}) { return impl_->receive_channel_.async_receive(std::forward(token)); } - /** @brief Receives server pushes synchronously without blocking. + /** @brief Wait for server pushes asynchronously + * + * This function suspends until a server push is received by the + * connection. On completion an unspecified number of pushes will + * have been added to the response object set with @ref + * boost::redis::connection::set_receive_response. + * + * To prevent receiving an unbound number of pushes the connection + * blocks further read operations on the socket when 256 pushes + * accumulate internally (we don't make any commitment to this + * exact number). When that happens any `async_exec`s and + * health-checks won't make any progress and the connection may + * eventually timeout. To avoid that Apps should call + * `async_receive2` continuously in a loop. + * + * @Note To avoid deadlocks the task (e.g. coroutine) calling + * `async_receive2` should not call `async_exec` in a way where + * they could block each other. + * + * For an example see cpp20_subscriber.cpp. The completion token + * must have the following signature + * + * @code + * void f(system::error_code); + * @endcode + * + * @par Per-operation cancellation + * This operation supports the following cancellation types: + * + * @li `asio::cancellation_type_t::terminal`. + * @li `asio::cancellation_type_t::partial`. + * @li `asio::cancellation_type_t::total`. + * + * Calling `basic_connection::cancel(operation::receive)` will + * also cancel any ongoing receive operations. + * + * @param token Completion token. + */ + template > + auto async_receive2(CompletionToken&& token = {}) + { + return impl_->async_receive2(std::forward(token)); + } + + /** @brief (Deprecated) Receives server pushes synchronously without blocking. * * Receives a server push synchronously by calling `try_receive` on * the underlying channel. If the operation fails because @@ -638,23 +727,10 @@ public: * @param ec Contains the error if any occurred. * @returns The number of bytes read from the socket. */ + BOOST_DEPRECATED("Please, use async_receive2 instead.") std::size_t receive(system::error_code& ec) { - std::size_t size = 0; - - auto f = [&](system::error_code const& ec2, std::size_t n) { - ec = ec2; - size = n; - }; - - auto const res = impl_->receive_channel_.try_receive(f); - if (ec) - return 0; - - if (!res) - ec = error::sync_receive_push_failed; - - return size; + return impl_->receive(ec); } /** @brief Executes commands on the Redis server asynchronously. @@ -837,7 +913,7 @@ public: "the other member functions to interact with the connection.") auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); } - /// Sets the response object of @ref async_receive operations. + /// Sets the response object of @ref async_receive2 operations. template void set_receive_response(Response& resp) { @@ -1028,13 +1104,25 @@ public: /// @copydoc basic_connection::async_receive template + BOOST_DEPRECATED("Please use async_receive2 instead.") auto async_receive(CompletionToken&& token = {}) { return impl_.async_receive(std::forward(token)); } + /// @copydoc basic_connection::async_receive2 + template + auto async_receive2(CompletionToken&& token = {}) + { + return impl_.async_receive2(std::forward(token)); + } + /// @copydoc basic_connection::receive - std::size_t receive(system::error_code& ec) { return impl_.receive(ec); } + BOOST_DEPRECATED("Please use async_receive2 instead.") + std::size_t receive(system::error_code& ec) + { + return impl_.impl_->receive(ec); + } /** * @brief Calls @ref boost::redis::basic_connection::async_exec. diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 506d0139..f39699f3 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -74,7 +74,7 @@ enum class error /// SSL handshake timeout ssl_handshake_timeout, - /// Can't receive push synchronously without blocking + /// (Deprecated) Can't receive push synchronously without blocking sync_receive_push_failed, /// Incompatible node depth. diff --git a/include/boost/redis/impl/flat_tree.ipp b/include/boost/redis/impl/flat_tree.ipp new file mode 100644 index 00000000..48be5040 --- /dev/null +++ b/include/boost/redis/impl/flat_tree.ipp @@ -0,0 +1,125 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Nikolai Vladimirov (nvladimirov.work@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include + +namespace boost::redis::resp3 { + +flat_tree::flat_tree(flat_tree const& other) +: data_{other.data_} +, view_tree_{other.view_tree_} +, ranges_{other.ranges_} +, pos_{0u} +, reallocs_{0u} +, total_msgs_{other.total_msgs_} +{ + view_tree_.resize(ranges_.size()); + set_views(); +} + +flat_tree& +flat_tree::operator=(flat_tree other) +{ + swap(*this, other); + return *this; +} + +void flat_tree::reserve(std::size_t bytes, std::size_t nodes) +{ + data_.reserve(bytes); + view_tree_.reserve(nodes); + ranges_.reserve(nodes); +} + +void flat_tree::clear() +{ + pos_ = 0u; + total_msgs_ = 0u; + reallocs_ = 0u; + data_.clear(); + view_tree_.clear(); + ranges_.clear(); +} + +void flat_tree::set_views() +{ + BOOST_ASSERT_MSG(pos_ < view_tree_.size(), "notify_done called but no nodes added."); + BOOST_ASSERT_MSG(view_tree_.size() == ranges_.size(), "Incompatible sizes."); + + for (; pos_ < view_tree_.size(); ++pos_) { + auto const& r = ranges_.at(pos_); + view_tree_.at(pos_).value = std::string_view{data_.data() + r.offset, r.size}; + } +} + +void flat_tree::notify_done() +{ + total_msgs_ += 1; + set_views(); +} + +void flat_tree::push(node_view const& node) +{ + auto data_before = data_.data(); + add_node_impl(node); + auto data_after = data_.data(); + + if (data_after != data_before) { + pos_ = 0; + reallocs_ += 1; + } +} + +void flat_tree::add_node_impl(node_view const& node) +{ + ranges_.push_back({data_.size(), node.value.size()}); + + // This must come after setting the offset above. + data_.append(node.value.data(), node.value.size()); + + view_tree_.push_back(node); +} + +void swap(flat_tree& a, flat_tree& b) +{ + using std::swap; + + swap(a.data_, b.data_); + swap(a.view_tree_, b.view_tree_); + swap(a.ranges_, b.ranges_); + swap(a.pos_, b.pos_); + swap(a.reallocs_, b.reallocs_); + swap(a.total_msgs_, b.total_msgs_); +} + +bool +operator==( + flat_tree::range const& a, + flat_tree::range const& b) +{ + return a.offset == b.offset && a.size == b.size; +} + +bool operator==(flat_tree const& a, flat_tree const& b) +{ + return + a.data_ == b.data_ && + a.view_tree_ == b.view_tree_ && + a.ranges_ == b.ranges_ && + a.pos_ == b.pos_ && + //a.reallocs_ == b.reallocs_ && + a.total_msgs_ == b.total_msgs_; +} + +bool operator!=(flat_tree const& a, flat_tree const& b) +{ + return !(a == b); +} + +} // namespace boost::redis diff --git a/include/boost/redis/impl/response.ipp b/include/boost/redis/impl/response.ipp index 21e7b287..a4b09a6e 100644 --- a/include/boost/redis/impl/response.ipp +++ b/include/boost/redis/impl/response.ipp @@ -11,30 +11,15 @@ namespace boost::redis { -namespace { -template -auto& get_value(Container& c) -{ - return c; -} - -template <> -auto& get_value(flat_response_value& c) -{ - return c.view(); -} - -template -void consume_one_impl(Response& r, system::error_code& ec) +void consume_one(generic_response& r, system::error_code& ec) { if (r.has_error()) return; // Nothing to consume. - auto& value = get_value(r.value()); - if (std::empty(value)) + if (std::empty(r.value())) return; // Nothing to consume. - auto const depth = value.front().depth; + auto const depth = r.value().front().depth; // To simplify we will refuse to consume any data-type that is not // a root node. I think there is no use for that and it is complex @@ -48,17 +33,11 @@ void consume_one_impl(Response& r, system::error_code& ec) return e.depth == depth; }; - auto match = std::find_if(std::next(std::cbegin(value)), std::cend(value), f); + auto match = std::find_if(std::next(std::cbegin(r.value())), std::cend(r.value()), f); - value.erase(std::cbegin(value), match); + r.value().erase(std::cbegin(r.value()), match); } -} // namespace - -void consume_one(generic_response& r, system::error_code& ec) { consume_one_impl(r, ec); } - -void consume_one(generic_flat_response& r, system::error_code& ec) { consume_one_impl(r, ec); } - void consume_one(generic_response& r) { system::error_code ec; @@ -67,12 +46,4 @@ void consume_one(generic_response& r) throw system::system_error(ec); } -void consume_one(generic_flat_response& r) -{ - system::error_code ec; - consume_one(r, ec); - if (ec) - throw system::system_error(ec); -} - } // namespace boost::redis diff --git a/include/boost/redis/resp3/flat_tree.hpp b/include/boost/redis/resp3/flat_tree.hpp new file mode 100644 index 00000000..90b23214 --- /dev/null +++ b/include/boost/redis/resp3/flat_tree.hpp @@ -0,0 +1,131 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Nikolai Vladimirov (nvladimirov.work@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_RESP3_FLAT_TREE_HPP +#define BOOST_REDIS_RESP3_FLAT_TREE_HPP + +#include +#include + +#include +#include + +namespace boost::redis { + +namespace adapter::detail { + template class general_aggregate; +} + +namespace resp3 { + +/** @brief A generic-response that stores data contiguously + * + * Similar to the @ref boost::redis::resp3::tree but data is + * stored contiguously. + */ +struct flat_tree { +public: + /// Default constructor + flat_tree() = default; + + /// Move constructor + flat_tree(flat_tree&&) noexcept = default; + + /// Copy constructor + flat_tree(flat_tree const& other); + + /// Copy assignment + flat_tree& operator=(flat_tree other); + + friend void swap(flat_tree&, flat_tree&); + + friend + bool operator==(flat_tree const&, flat_tree const&); + + friend + bool operator!=(flat_tree const&, flat_tree const&); + + /** @brief Reserve capacity + * + * Reserve memory for incoming data. + * + * @param bytes Number of bytes to reserve for data. + * @param nodes Number of nodes to reserve. + */ + void reserve(std::size_t bytes, std::size_t nodes); + + /** @brief Clear both the data and the node buffers + * + * @Note: A `boost::redis:.flat_tree` can contain the + * response to multiple Redis commands and server pushes. Calling + * this function will erase everything contained in it. + */ + void clear(); + + /// Returns the size of the data buffer + auto data_size() const noexcept -> std::size_t + { return data_.size(); } + + /// Returns the RESP3 response + auto get_view() const -> view_tree const& + { return view_tree_; } + + /** @brief Returns the number of times reallocation took place + * + * This function returns how many reallocations were performed and + * can be useful to determine how much memory to reserve upfront. + */ + auto get_reallocs() const noexcept -> std::size_t + { return reallocs_; } + + /// Returns the number of complete RESP3 messages contained in this object. + std::size_t get_total_msgs() const noexcept + { return total_msgs_; } + +private: + template friend class adapter::detail::general_aggregate; + + // Notify the object that all nodes were pushed. + void notify_done(); + + // Push a new node to the response + void push(node_view const& node); + + void add_node_impl(node_view const& node); + + void set_views(); + + // Range into the data buffer. + struct range { + std::size_t offset; + std::size_t size; + }; + + friend bool operator==(range const&, range const&); + + std::string data_; + view_tree view_tree_; + std::vector ranges_; + std::size_t pos_ = 0u; + std::size_t reallocs_ = 0u; + std::size_t total_msgs_ = 0u; +}; + +/// Swaps two responses +void swap(flat_tree&, flat_tree&); + +/// Equality operator +bool operator==(flat_tree const&, flat_tree const&); + +/// Inequality operator +bool operator!=(flat_tree const&, flat_tree const&); + +} // resp3 +} // namespace boost::redis + +#endif // BOOST_REDIS_RESP3_FLAT_TREE_HPP diff --git a/include/boost/redis/resp3/node.hpp b/include/boost/redis/resp3/node.hpp index 84fb5fca..1e284dab 100644 --- a/include/boost/redis/resp3/node.hpp +++ b/include/boost/redis/resp3/node.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -53,27 +53,19 @@ auto operator==(basic_node const& a, basic_node const& b) // clang-format on }; +/// Inequality operator for RESP3 nodes +template +auto operator!=(basic_node const& a, basic_node const& b) +{ + return !(a == b); +}; + /// A node in the response tree that owns its data. using node = basic_node; /// A node in the response tree that does not own its data. using node_view = basic_node; -struct offset_string { - std::string_view data; - std::size_t offset{}; - std::size_t size{}; - - operator std::string() const { return std::string{data}; } - - friend std::ostream& operator<<(std::ostream& os, offset_string const& s) - { - return os << s.data; - } -}; - -using offset_node = basic_node; - } // namespace boost::redis::resp3 #endif // BOOST_REDIS_RESP3_NODE_HPP diff --git a/include/boost/redis/resp3/tree.hpp b/include/boost/redis/resp3/tree.hpp new file mode 100644 index 00000000..fa4f4c66 --- /dev/null +++ b/include/boost/redis/resp3/tree.hpp @@ -0,0 +1,29 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_RESP3_TREE_HPP +#define BOOST_REDIS_RESP3_TREE_HPP + +#include + +#include +#include + +namespace boost::redis::resp3 { + +/// A RESP3 tree that owns its data. +template >> +using basic_tree = std::vector, Allocator>; + +/// A RESP3 tree that owns its data. +using tree = basic_tree; + +/// A RESP3 tree whose data are `std::string_views`. +using view_tree = basic_tree; + +} + +#endif // BOOST_REDIS_RESP3_RESPONSE_HPP diff --git a/include/boost/redis/response.hpp b/include/boost/redis/response.hpp index 304ca1e6..5622f3a8 100644 --- a/include/boost/redis/response.hpp +++ b/include/boost/redis/response.hpp @@ -9,6 +9,7 @@ #include #include +#include #include @@ -29,85 +30,9 @@ using response = std::tuple...>; * [pre-order](https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR) * view of the response tree. */ -using generic_response = adapter::result>; +using generic_response = adapter::result; -/** - * Forward declaration to allow friendship with the template class - * that manages filling of flat_response_value. - */ -namespace adapter::detail { -template -class general_aggregate; -} - -struct flat_response_value { -public: - /// Reserve capacity for nodes and data storage. - void reserve(std::size_t num_nodes, std::size_t string_size) - { - data_.reserve(num_nodes * string_size); - view_.reserve(num_nodes); - } - - void clear() - { - data_.clear(); - view_.clear(); - } - - std::size_t size() const noexcept { return view_.size(); } - bool empty() noexcept { return view_.empty(); } - - resp3::offset_node& at(std::size_t index) { return view_.at(index); } - resp3::offset_node const& at(std::size_t index) const { return view_.at(index); } - - std::vector const& view() const { return view_; } - std::vector& view() { return view_; } - -private: - void set_view() - { - for (auto& node : view_) { - auto& offset_string = node.value; - offset_string.data = std::string_view{ - data_.data() + offset_string.offset, - offset_string.size}; - } - } - - template - void add_node(resp3::basic_node const& nd) - { - resp3::offset_string offset_string; - offset_string.offset = data_.size(); - offset_string.size = nd.value.size(); - - data_.append(nd.value.data(), nd.value.size()); - - resp3::offset_node new_node; - new_node.data_type = nd.data_type; - new_node.aggregate_size = nd.aggregate_size; - new_node.depth = nd.depth; - new_node.value = std::move(offset_string); - - view_.push_back(std::move(new_node)); - } - - template - friend class adapter::detail::general_aggregate; - - std::string data_; - std::vector view_; -}; - -/** @brief A memory-efficient generic response to a request. - * @ingroup high-level-api - * - * Uses a compact buffer to store RESP3 data with reduced allocations. - */ -using generic_flat_response = adapter::result; - -/** @brief Consume on response from a generic response +/** @brief (Deprecated) Consume on response from a generic response * * This function rotates the elements so that the start of the next * response becomes the new front element. For example the output of @@ -146,18 +71,16 @@ using generic_flat_response = adapter::result; * @param r The response to modify. * @param ec Will be populated in case of error. */ +//BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.") void consume_one(generic_response& r, system::error_code& ec); -/// Consume on response from a generic flat response -void consume_one(generic_flat_response& r, system::error_code& ec); - /** - * @brief Throwing overloads of `consume_one`. + * @brief (Deprecated) Throwing overload of `consume_one`. * * @param r The response to modify. */ +//BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.") void consume_one(generic_response& r); -void consume_one(generic_flat_response& r); } // namespace boost::redis diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 09334421..06c2dcd3 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 78a65ee1..4a9bf03c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -56,6 +56,7 @@ make_test(test_conn_run_cancel) make_test(test_conn_check_health) make_test(test_conn_exec) make_test(test_conn_push) +make_test(test_conn_push2) make_test(test_conn_monitor) make_test(test_conn_reconnect) make_test(test_conn_exec_cancel) diff --git a/test/test_any_adapter.cpp b/test/test_any_adapter.cpp index 6d4ee82e..ca123a49 100644 --- a/test/test_any_adapter.cpp +++ b/test/test_any_adapter.cpp @@ -13,7 +13,7 @@ #include using boost::redis::generic_response; -using boost::redis::generic_flat_response; +using boost::redis::resp3::flat_tree; using boost::redis::response; using boost::redis::ignore; using boost::redis::any_adapter; @@ -25,7 +25,7 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types) response r1; response r2; generic_response r3; - generic_flat_response r4; + flat_tree r4; BOOST_CHECK_NO_THROW(any_adapter{r1}); BOOST_CHECK_NO_THROW(any_adapter{r2}); diff --git a/test/test_conn_check_health.cpp b/test/test_conn_check_health.cpp index d101a52a..78062e3b 100644 --- a/test/test_conn_check_health.cpp +++ b/test/test_conn_check_health.cpp @@ -262,4 +262,4 @@ int main() test_flexible().run(); return boost::report_errors(); -} +} \ No newline at end of file diff --git a/test/test_conn_echo_stress.cpp b/test/test_conn_echo_stress.cpp index 5b37489d..48157b0b 100644 --- a/test/test_conn_echo_stress.cpp +++ b/test/test_conn_echo_stress.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -27,6 +27,7 @@ using error_code = boost::system::error_code; using boost::redis::operation; using boost::redis::request; using boost::redis::response; +using boost::redis::resp3::flat_tree; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::logger; @@ -54,39 +55,44 @@ std::ostream& operator<<(std::ostream& os, usage const& u) namespace { -auto push_consumer(connection& conn, int expected) -> net::awaitable +auto +receiver( + connection& conn, + flat_tree& resp, + std::size_t expected) -> net::awaitable { - int c = 0; - for (error_code ec;;) { - conn.receive(ec); - if (ec == error::sync_receive_push_failed) { - ec = {}; - co_await conn.async_receive(net::redirect_error(ec)); - } else if (!ec) { - //std::cout << "Skipping suspension." << std::endl; - } - - if (ec) { - BOOST_TEST(false, "push_consumer error: " << ec.message()); - co_return; - } - if (++c == expected) - break; + std::size_t push_counter = 0; + while (push_counter != expected) { + co_await conn.async_receive2(); + push_counter += resp.get_total_msgs(); + resp.clear(); } conn.cancel(); } -auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable +auto echo_session(connection& conn, const request& req, std::size_t n) -> net::awaitable { - for (auto i = 0; i < n; ++i) - co_await conn.async_exec(pubs); + for (auto i = 0u; i < n; ++i) + co_await conn.async_exec(req); } void rethrow_on_error(std::exception_ptr exc) { - if (exc) + if (exc) { + BOOST_TEST(false); std::rethrow_exception(exc); + } +} + +request make_pub_req(std::size_t n_pubs) +{ + request req; + req.push("PING"); + for (std::size_t i = 0u; i < n_pubs; ++i) + req.push("PUBLISH", "channel", "payload"); + + return req; } BOOST_AUTO_TEST_CASE(echo_stress) @@ -98,22 +104,22 @@ BOOST_AUTO_TEST_CASE(echo_stress) // Number of coroutines that will send pings sharing the same // connection to redis. - constexpr int sessions = 150; + constexpr std::size_t sessions = 150u; // The number of pings that will be sent by each session. - constexpr int msgs = 200; + constexpr std::size_t msgs = 200u; // The number of publishes that will be sent by each session with // each message. - constexpr int n_pubs = 25; + constexpr std::size_t n_pubs = 25u; // This is the total number of pushes we will receive. - constexpr int total_pushes = sessions * msgs * n_pubs + 1; + constexpr std::size_t total_pushes = sessions * msgs * n_pubs + 1; - request pubs; - pubs.push("PING"); - for (int i = 0; i < n_pubs; ++i) - pubs.push("PUBLISH", "channel", "payload"); + flat_tree resp; + conn.set_receive_response(resp); + + request const pub_req = make_pub_req(n_pubs); // Run the connection bool run_finished = false, subscribe_finished = false; @@ -123,6 +129,10 @@ BOOST_AUTO_TEST_CASE(echo_stress) std::clog << "async_run finished" << std::endl; }); + // Op that will consume the pushes counting down until all expected + // pushes have been received. + net::co_spawn(ctx, receiver(conn, resp, total_pushes), rethrow_on_error); + // Subscribe, then launch the coroutines request req; req.push("SUBSCRIBE", "channel"); @@ -130,12 +140,8 @@ BOOST_AUTO_TEST_CASE(echo_stress) subscribe_finished = true; BOOST_TEST(ec == error_code()); - // Op that will consume the pushes counting down until all expected - // pushes have been received. - net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error); - - for (int i = 0; i < sessions; ++i) - net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error); + for (std::size_t i = 0; i < sessions; ++i) + net::co_spawn(ctx, echo_session(conn, pub_req, msgs), rethrow_on_error); }); // Run the test @@ -144,7 +150,13 @@ BOOST_AUTO_TEST_CASE(echo_stress) BOOST_TEST(subscribe_finished); // Print statistics - std::cout << "-------------------\n" << conn.get_usage() << std::endl; + std::cout + << "-------------------\n" + << "Usage data: \n" + << conn.get_usage() << "\n" + << "-------------------\n" + << "Reallocations: " << resp.get_reallocs() + << std::endl; } } // namespace diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index c2d4fc42..28a23497 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -26,7 +26,7 @@ namespace net = boost::asio; using boost::redis::config; using boost::redis::connection; -using boost::redis::generic_flat_response; +using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::operation; using boost::redis::request; diff --git a/test/test_conn_exec_cancel.cpp b/test/test_conn_exec_cancel.cpp index 1870d784..939cd361 100644 --- a/test/test_conn_exec_cancel.cpp +++ b/test/test_conn_exec_cancel.cpp @@ -32,7 +32,7 @@ using boost::redis::operation; using boost::redis::error; using boost::redis::request; using boost::redis::response; -using boost::redis::generic_flat_response; +using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::logger; diff --git a/test/test_conn_exec_cancel2.cpp b/test/test_conn_exec_cancel2.cpp deleted file mode 100644 index 08337461..00000000 --- a/test/test_conn_exec_cancel2.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#include - -#include -#define BOOST_TEST_MODULE conn_exec_cancel -#include - -#include "common.hpp" - -#include - -#ifdef BOOST_ASIO_HAS_CO_AWAIT - -// NOTE1: Sends hello separately. I have observed that if hello and -// blpop are sent toguether, Redis will send the response of hello -// right away, not waiting for blpop. That is why we have to send it -// separately. - -namespace net = boost::asio; -using error_code = boost::system::error_code; -using boost::redis::operation; -using boost::redis::request; -using boost::redis::response; -using boost::redis::generic_flat_response; -using boost::redis::ignore; -using boost::redis::ignore_t; -using boost::redis::config; -using boost::redis::logger; -using boost::redis::connection; -using namespace std::chrono_literals; - -namespace { - -auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable -{ - auto ex = co_await net::this_coro::executor; - - generic_flat_response gresp; - auto conn = std::make_shared(ex); - - run(conn); - - net::steady_timer st{ex}; - st.expires_after(std::chrono::seconds{1}); - - // See NOTE1. - request req0; - req0.push("PING", "async_ignore_explicit_cancel_of_req_written"); - co_await conn->async_exec(req0, gresp); - - request req1; - req1.push("BLPOP", "any", 3); - - bool seen = false; - conn->async_exec(req1, gresp, [&](error_code ec, std::size_t) { - // No error should occur since the cancellation should be ignored - std::cout << "async_exec (1): " << ec.message() << std::endl; - BOOST_TEST(ec == error_code()); - seen = true; - }); - - // Will complete while BLPOP is pending. - error_code ec; - co_await st.async_wait(net::redirect_error(ec)); - conn->cancel(operation::exec); - - BOOST_TEST(ec == error_code()); - - request req2; - req2.push("PING"); - - // Test whether the connection remains usable after a call to - // cancel(exec). - co_await conn->async_exec(req2, gresp, net::redirect_error(ec)); - conn->cancel(); - - BOOST_TEST(ec == error_code()); - BOOST_TEST(seen); -} - -BOOST_AUTO_TEST_CASE(test_ignore_explicit_cancel_of_req_written) -{ - run_coroutine_test(async_ignore_explicit_cancel_of_req_written()); -} - -} // namespace - -#else -BOOST_AUTO_TEST_CASE(dummy) { } -#endif diff --git a/test/test_conn_exec_error.cpp b/test/test_conn_exec_error.cpp index 19638e2d..b77f1299 100644 --- a/test/test_conn_exec_error.cpp +++ b/test/test_conn_exec_error.cpp @@ -21,7 +21,7 @@ using error_code = boost::system::error_code; using boost::redis::connection; using boost::redis::request; using boost::redis::response; -using boost::redis::generic_flat_response; +using boost::redis::generic_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::error; @@ -266,12 +266,12 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) conn->async_exec(req1, ignore, c1); - generic_flat_response gresp; + generic_response gresp; conn->set_receive_response(gresp); - auto c3 = [&](error_code ec, std::size_t) { + auto c3 = [&](error_code ec) { c3_called = true; - std::cout << "async_receive" << std::endl; + std::cout << "async_receive2" << std::endl; BOOST_TEST(!ec); BOOST_TEST(gresp.has_error()); BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error); @@ -281,7 +281,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax) conn->cancel(operation::reconnection); }; - conn->async_receive(c3); + conn->async_receive2(c3); run(conn); @@ -326,4 +326,4 @@ BOOST_AUTO_TEST_CASE(issue_287_generic_response_error_then_success) BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command"); } -} // namespace \ No newline at end of file +} // namespace diff --git a/test/test_conn_monitor.cpp b/test/test_conn_monitor.cpp index 24d22ee1..090c5455 100644 --- a/test/test_conn_monitor.cpp +++ b/test/test_conn_monitor.cpp @@ -41,7 +41,7 @@ class test_monitor { void start_receive() { - conn.async_receive([this](error_code ec, std::size_t) { + conn.async_receive2([this](error_code ec) { // We should expect one push entry, at least BOOST_TEST_EQ(ec, error_code()); BOOST_TEST(monitor_resp.has_value()); @@ -118,4 +118,4 @@ int main() test_monitor{}.run(); return boost::report_errors(); -} \ No newline at end of file +} diff --git a/test/test_conn_push2.cpp b/test/test_conn_push2.cpp new file mode 100644 index 00000000..9e3d7f62 --- /dev/null +++ b/test/test_conn_push2.cpp @@ -0,0 +1,392 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include +#include +#include + +#include +#include + +#include + +#define BOOST_TEST_MODULE conn_push +#include + +#include "common.hpp" + +#include +#include + +namespace net = boost::asio; +namespace redis = boost::redis; + +using boost::redis::operation; +using boost::redis::connection; +using boost::system::error_code; +using boost::redis::request; +using boost::redis::response; +using boost::redis::resp3::flat_tree; +using boost::redis::ignore; +using boost::redis::ignore_t; +using boost::system::error_code; +using boost::redis::logger; +using namespace std::chrono_literals; + +namespace { + +BOOST_AUTO_TEST_CASE(receives_push_waiting_resps) +{ + request req1; + req1.push("HELLO", 3); + req1.push("PING", "Message1"); + + request req2; + req2.push("SUBSCRIBE", "channel"); + + request req3; + req3.push("PING", "Message2"); + req3.push("QUIT"); + + net::io_context ioc; + + auto conn = std::make_shared(ioc); + + bool push_received = false, c1_called = false, c2_called = false, c3_called = false; + + auto c3 = [&](error_code ec, std::size_t) { + c3_called = true; + std::cout << "c3: " << ec.message() << std::endl; + }; + + auto c2 = [&, conn](error_code ec, std::size_t) { + c2_called = true; + BOOST_TEST(ec == error_code()); + conn->async_exec(req3, ignore, c3); + }; + + auto c1 = [&, conn](error_code ec, std::size_t) { + c1_called = true; + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c2); + }; + + conn->async_exec(req1, ignore, c1); + + run(conn, make_test_config(), {}); + + conn->async_receive2([&, conn](error_code ec) { + std::cout << "async_receive2" << std::endl; + BOOST_TEST(ec == error_code()); + push_received = true; + conn->cancel(); + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(push_received); + BOOST_TEST(c1_called); + BOOST_TEST(c2_called); + BOOST_TEST(c3_called); +} + +BOOST_AUTO_TEST_CASE(push_received1) +{ + net::io_context ioc; + auto conn = std::make_shared(ioc); + + flat_tree resp; + conn->set_receive_response(resp); + + // Trick: Uses SUBSCRIBE because this command has no response or + // better said, its response is a server push, which is what we + // want to test. + request req; + req.push("SUBSCRIBE", "channel1"); + req.push("SUBSCRIBE", "channel2"); + + bool push_received = false, exec_finished = false; + + conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) { + exec_finished = true; + std::cout << "async_exec" << std::endl; + BOOST_TEST(ec == error_code()); + }); + + conn->async_receive2([&, conn](error_code ec) { + push_received = true; + std::cout << "async_receive2" << std::endl; + + BOOST_TEST(ec == error_code()); + + BOOST_CHECK_EQUAL(resp.get_total_msgs(), 2u); + + conn->cancel(); + }); + + run(conn); + ioc.run_for(test_timeout); + + BOOST_TEST(exec_finished); + BOOST_TEST(push_received); +} + +BOOST_AUTO_TEST_CASE(push_filtered_out) +{ + net::io_context ioc; + auto conn = std::make_shared(ioc); + + request req; + req.push("HELLO", 3); + req.push("PING"); + req.push("SUBSCRIBE", "channel"); + req.push("QUIT"); + + response resp; + + bool exec_finished = false, push_received = false; + + conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) { + exec_finished = true; + BOOST_TEST(ec == error_code()); + }); + + conn->async_receive2([&, conn](error_code ec) { + push_received = true; + BOOST_TEST(ec == error_code()); + conn->cancel(operation::reconnection); + }); + + run(conn); + + ioc.run_for(test_timeout); + BOOST_TEST(exec_finished); + BOOST_TEST(push_received); + + BOOST_CHECK_EQUAL(std::get<1>(resp).value(), "PONG"); + BOOST_CHECK_EQUAL(std::get<2>(resp).value(), "OK"); +} + +struct response_error_tag { }; +response_error_tag error_tag_obj; + +struct response_error_adapter { + void on_init() { } + void on_done() { } + + void on_node( + boost::redis::resp3::basic_node const&, + boost::system::error_code& ec) + { + ec = boost::redis::error::incompatible_size; + } +}; + +auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; } + +BOOST_AUTO_TEST_CASE(test_push_adapter) +{ + net::io_context ioc; + auto conn = std::make_shared(ioc); + + request req; + req.push("HELLO", 3); + req.push("PING"); + req.push("SUBSCRIBE", "channel"); + req.push("PING"); + + conn->set_receive_response(error_tag_obj); + + bool push_received = false, exec_finished = false, run_finished = false; + + conn->async_receive2([&, conn](error_code ec) { + BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled); + push_received = true; + }); + + conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) { + BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled); + exec_finished = true; + }); + + auto cfg = make_test_config(); + cfg.reconnect_wait_interval = 0s; + conn->async_run(cfg, [&run_finished](error_code ec) { + BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size); + run_finished = true; + }); + + ioc.run_for(test_timeout); + BOOST_TEST(push_received); + BOOST_TEST(exec_finished); + BOOST_TEST(run_finished); + + // TODO: Reset the ioc reconnect and send a quit to ensure + // reconnection is possible after an error. +} + +void launch_push_consumer(std::shared_ptr conn) +{ + conn->async_receive2([conn](error_code ec) { + if (ec) { + BOOST_TEST(ec == net::experimental::error::channel_cancelled); + return; + } + launch_push_consumer(conn); + }); +} + +BOOST_AUTO_TEST_CASE(many_subscribers) +{ + request req0; + req0.get_config().cancel_on_connection_lost = false; + req0.push("HELLO", 3); + + request req1; + req1.get_config().cancel_on_connection_lost = false; + req1.push("PING", "Message1"); + + request req2; + req2.get_config().cancel_on_connection_lost = false; + req2.push("SUBSCRIBE", "channel"); + + request req3; + req3.get_config().cancel_on_connection_lost = false; + req3.push("QUIT"); + + net::io_context ioc; + auto conn = std::make_shared(ioc); + + bool finished = false; + + auto c11 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->cancel(operation::reconnection); + finished = true; + }; + auto c10 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req3, ignore, c11); + }; + auto c9 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c10); + }; + auto c8 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c9); + }; + auto c7 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c8); + }; + auto c6 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c7); + }; + auto c5 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c6); + }; + auto c4 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c5); + }; + auto c3 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c4); + }; + auto c2 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c3); + }; + auto c1 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req2, ignore, c2); + }; + auto c0 = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + conn->async_exec(req1, ignore, c1); + }; + + conn->async_exec(req0, ignore, c0); + launch_push_consumer(conn); + + run(conn, make_test_config(), {}); + + ioc.run_for(test_timeout); + BOOST_TEST(finished); +} + +BOOST_AUTO_TEST_CASE(test_unsubscribe) +{ + net::io_context ioc; + connection conn{ioc}; + + // Subscribe to 3 channels and 2 patterns. Use CLIENT INFO to verify this took effect + request req_subscribe; + req_subscribe.push("SUBSCRIBE", "ch1", "ch2", "ch3"); + req_subscribe.push("PSUBSCRIBE", "ch1*", "ch2*"); + req_subscribe.push("CLIENT", "INFO"); + + // Then, unsubscribe from some of them, and verify again + request req_unsubscribe; + req_unsubscribe.push("UNSUBSCRIBE", "ch1"); + req_unsubscribe.push("PUNSUBSCRIBE", "ch2*"); + req_unsubscribe.push("CLIENT", "INFO"); + + // Finally, ping to verify that the connection is still usable + request req_ping; + req_ping.push("PING", "test_unsubscribe"); + + response resp_subscribe, resp_unsubscribe, resp_ping; + + bool subscribe_finished = false, unsubscribe_finished = false, ping_finished = false, + run_finished = false; + + auto on_ping = [&](error_code ec, std::size_t) { + BOOST_TEST(ec == error_code()); + ping_finished = true; + BOOST_TEST(std::get<0>(resp_ping).has_value()); + BOOST_TEST(std::get<0>(resp_ping).value() == "test_unsubscribe"); + conn.cancel(); + }; + + auto on_unsubscribe = [&](error_code ec, std::size_t) { + unsubscribe_finished = true; + BOOST_TEST(ec == error_code()); + BOOST_TEST(std::get<0>(resp_unsubscribe).has_value()); + BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub") == "2"); + BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub") == "1"); + conn.async_exec(req_ping, resp_ping, on_ping); + }; + + auto on_subscribe = [&](error_code ec, std::size_t) { + subscribe_finished = true; + BOOST_TEST(ec == error_code()); + BOOST_TEST(std::get<0>(resp_subscribe).has_value()); + BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "sub") == "3"); + BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "psub") == "2"); + conn.async_exec(req_unsubscribe, resp_unsubscribe, on_unsubscribe); + }; + + conn.async_exec(req_subscribe, resp_subscribe, on_subscribe); + + conn.async_run(make_test_config(), [&run_finished](error_code ec) { + BOOST_TEST(ec == net::error::operation_aborted); + run_finished = true; + }); + + ioc.run_for(test_timeout); + + BOOST_TEST(subscribe_finished); + BOOST_TEST(unsubscribe_finished); + BOOST_TEST(ping_finished); + BOOST_TEST(run_finished); +} + +} // namespace diff --git a/test/test_issue_50.cpp b/test/test_issue_50.cpp index 98a4c76b..00c04cad 100644 --- a/test/test_issue_50.cpp +++ b/test/test_issue_50.cpp @@ -42,16 +42,15 @@ namespace { // Push consumer auto receiver(std::shared_ptr conn) -> net::awaitable { - std::cout << "uuu" << std::endl; + std::cout << "Entering receiver" << std::endl; while (conn->will_reconnect()) { - std::cout << "dddd" << std::endl; - // Loop reading Redis pushs messages. - for (;;) { - std::cout << "aaaa" << std::endl; - error_code ec; - co_await conn->async_receive(net::redirect_error(ec)); + std::cout << "Reconnect loop" << std::endl; + // Loop reading Redis pushes. + for (error_code ec;;) { + std::cout << "Receive loop" << std::endl; + co_await conn->async_receive2(net::redirect_error(ec)); if (ec) { - std::cout << "Error in async_receive" << std::endl; + std::cout << "Error in async_receive2" << std::endl; break; } } diff --git a/test/test_low_level.cpp b/test/test_low_level.cpp index d26a377a..dc68e0ec 100644 --- a/test/test_low_level.cpp +++ b/test/test_low_level.cpp @@ -29,7 +29,6 @@ using boost::system::error_code; using boost::redis::request; using boost::redis::response; using boost::redis::generic_response; -using boost::redis::generic_flat_response; using boost::redis::ignore; using boost::redis::ignore_t; using boost::redis::adapter::result; @@ -635,7 +634,7 @@ BOOST_AUTO_TEST_CASE(cancel_one_1) BOOST_AUTO_TEST_CASE(cancel_one_empty) { - generic_flat_response resp; + generic_response resp; BOOST_TEST(resp.has_value()); consume_one(resp); @@ -644,7 +643,7 @@ BOOST_AUTO_TEST_CASE(cancel_one_empty) BOOST_AUTO_TEST_CASE(cancel_one_has_error) { - generic_flat_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}}; + generic_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}}; BOOST_TEST(resp.has_error()); consume_one(resp); diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 5ec3e92f..800d304f 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -22,15 +22,19 @@ using boost::redis::request; using boost::redis::adapter::adapt2; using boost::redis::adapter::result; -using boost::redis::generic_response; +using boost::redis::resp3::tree; +using boost::redis::resp3::flat_tree; using boost::redis::ignore_t; using boost::redis::resp3::detail::deserialize; using boost::redis::resp3::node; +using boost::redis::resp3::node_view; using boost::redis::resp3::to_string; using boost::redis::response; using boost::redis::any_adapter; using boost::system::error_code; +namespace resp3 = boost::redis::resp3; + #define RESP3_SET_PART1 "~6\r\n+orange\r" #define RESP3_SET_PART2 "\n+apple\r\n+one" #define RESP3_SET_PART3 "\r\n+two\r" @@ -42,7 +46,9 @@ BOOST_AUTO_TEST_CASE(low_level_sync_sans_io) try { result> resp; - deserialize(resp3_set, adapt2(resp)); + error_code ec; + deserialize(resp3_set, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); for (auto const& e : resp.value()) std::cout << e << std::endl; @@ -65,7 +71,9 @@ BOOST_AUTO_TEST_CASE(issue_210_empty_set) char const* wire = "*4\r\n:1\r\n~0\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK(std::get<1>(resp.value()).value().empty()); @@ -91,7 +99,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_one) char const* wire = "*4\r\n:1\r\n~1\r\n$3\r\nfoo\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().size(), 1u); @@ -118,7 +128,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_two) char const* wire = "*4\r\n:1\r\n~2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().at(0), std::string{"foo"}); @@ -140,7 +152,9 @@ BOOST_AUTO_TEST_CASE(issue_210_no_nested) char const* wire = "*4\r\n:1\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n"; - deserialize(wire, adapt2(resp)); + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value(), std::string{"foo"}); @@ -159,7 +173,10 @@ BOOST_AUTO_TEST_CASE(issue_233_array_with_null) result>> resp; char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n"; - deserialize(wire, adapt2(resp)); + + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(resp.value().at(0).value(), "one"); BOOST_TEST(!resp.value().at(1).has_value()); @@ -177,7 +194,10 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null) result>>> resp; char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n"; - deserialize(wire, adapt2(resp)); + + error_code ec; + deserialize(wire, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); BOOST_CHECK_EQUAL(resp.value().value().at(0).value(), "one"); BOOST_TEST(!resp.value().value().at(1).has_value()); @@ -313,3 +333,116 @@ BOOST_AUTO_TEST_CASE(check_counter_adapter) BOOST_CHECK_EQUAL(node, 7); BOOST_CHECK_EQUAL(done, 1); } + +namespace boost::redis::resp3 { + +template +std::ostream& operator<<(std::ostream& os, basic_node const& nd) +{ + os << "type: " << to_string(nd.data_type) << "\n" + << "aggregate_size: " << nd.aggregate_size << "\n" + << "depth: " << nd.depth << "\n" + << "value: " << nd.value << "\n"; + return os; +} + +template +std::ostream& operator<<(std::ostream& os, basic_tree const& resp) +{ + for (auto const& e: resp) + os << e << ","; + return os; +} + +} + +node from_node_view(node_view const& v) +{ + node ret; + ret.data_type = v.data_type; + ret.aggregate_size = v.aggregate_size; + ret.depth = v.depth; + ret.value = v.value; + return ret; +} + +tree from_flat(flat_tree const& resp) +{ + tree ret; + for (auto const& e: resp.get_view()) + ret.push_back(from_node_view(e)); + + return ret; +} + +// Parses the same data into a tree and a +// flat_tree, they should be equal to each other. +BOOST_AUTO_TEST_CASE(flat_tree_views_are_set) +{ + tree resp1; + flat_tree fresp; + + error_code ec; + deserialize(resp3_set, adapt2(resp1), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + deserialize(resp3_set, adapt2(fresp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + BOOST_CHECK_EQUAL(fresp.get_reallocs(), 1u); + BOOST_CHECK_EQUAL(fresp.get_total_msgs(), 1u); + + auto const resp2 = from_flat(fresp); + BOOST_CHECK_EQUAL(resp1, resp2); +} + +// The response should be reusable. +BOOST_AUTO_TEST_CASE(flat_tree_reuse) +{ + flat_tree tmp; + + // First use + error_code ec; + deserialize(resp3_set, adapt2(tmp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + BOOST_CHECK_EQUAL(tmp.get_reallocs(), 1u); + BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u); + + // Copy to compare after the reuse. + auto const resp1 = tmp.get_view(); + tmp.clear(); + + // Second use + deserialize(resp3_set, adapt2(tmp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + // No reallocation this time + BOOST_CHECK_EQUAL(tmp.get_reallocs(), 0u); + BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u); + + BOOST_CHECK_EQUAL(resp1, tmp.get_view()); +} + +BOOST_AUTO_TEST_CASE(flat_tree_copy_assign) +{ + flat_tree resp; + + error_code ec; + deserialize(resp3_set, adapt2(resp), ec); + BOOST_CHECK_EQUAL(ec, error_code{}); + + // Copy + resp3::flat_tree copy1{resp}; + + // Copy assignment + resp3::flat_tree copy2 = resp; + + // Assignment + resp3::flat_tree copy3; + copy3 = resp; + + BOOST_TEST((copy1 == resp)); + BOOST_TEST((copy2 == resp)); + BOOST_TEST((copy3 == resp)); +}