2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Merge pull request #340 from boostorg/263-marcelo

Concludes the work on the generic_flat_response started by Nikolai Vladimirov
This commit is contained in:
Marcelo
2025-11-18 14:10:23 +01:00
committed by GitHub
23 changed files with 1255 additions and 136 deletions

View File

@@ -87,9 +87,9 @@ them are:
* [Client-side caching](https://redis.io/docs/manual/client-side-caching/). * [Client-side caching](https://redis.io/docs/manual/client-side-caching/).
The connection class supports server pushes by means of the The connection class supports server pushes by means of the
`connection::async_receive` function, which can be `connection::async_receive2` function, which can be
called in the same connection that is being used to execute commands. called in the same connection that is being used to execute commands.
The coroutine below shows how to use it: The coroutine below shows how to use it
```cpp ```cpp
@@ -99,26 +99,25 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req; request req;
req.push("SUBSCRIBE", "channel"); req.push("SUBSCRIBE", "channel");
generic_response resp; flat_tree resp;
conn->set_receive_response(resp); conn->set_receive_response(resp);
// Loop while reconnection is enabled // Loop while reconnection is enabled
while (conn->will_reconnect()) { while (conn->will_reconnect()) {
// Reconnect to channels. // Reconnect to channels.
co_await conn->async_exec(req, ignore); co_await conn->async_exec(req);
// Loop reading Redis pushes. // Loop reading Redis pushes.
for (;;) { for (error_code ec;;) {
error_code ec; co_await conn->async_receive2(resp, redirect_error(ec));
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
if (ec) if (ec)
break; // Connection lost, break so we can reconnect to channels. break; // Connection lost, break so we can reconnect to channels.
// Use the response resp in some way and then clear it. // Use the response resp in some way and then clear it.
... ...
consume_one(resp); resp.clear();
} }
} }
} }
@@ -126,4 +125,4 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
## Further reading ## Further reading
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html). Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).

View File

@@ -97,9 +97,9 @@ them are:
* https://redis.io/docs/manual/client-side-caching/[Client-side caching]. * https://redis.io/docs/manual/client-side-caching/[Client-side caching].
The connection class supports server pushes by means of the The connection class supports server pushes by means of the
xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive`] function, which can be xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive2`] function, which can be
called in the same connection that is being used to execute commands. called in the same connection that is being used to execute commands.
The coroutine below shows how to use it: The coroutine below shows how to use it
[source,cpp] [source,cpp]
@@ -110,26 +110,25 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req; request req;
req.push("SUBSCRIBE", "channel"); req.push("SUBSCRIBE", "channel");
generic_response resp; flat_tree resp;
conn->set_receive_response(resp); conn->set_receive_response(resp);
// Loop while reconnection is enabled // Loop while reconnection is enabled
while (conn->will_reconnect()) { while (conn->will_reconnect()) {
// Reconnect to channels. // Reconnect to channels.
co_await conn->async_exec(req, ignore); co_await conn->async_exec(req);
// Loop reading Redis pushes. // Loop reading Redis pushes.
for (;;) { for (error_code ec;;) {
error_code ec; co_await conn->async_receive2(resp, redirect_error(ec));
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
if (ec) if (ec)
break; // Connection lost, break so we can reconnect to channels. break; // Connection lost, break so we can reconnect to channels.
// Use the response resp in some way and then clear it. // Use the response here and then clear it.
... ...
consume_one(resp); resp.clear();
} }
} }
} }

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
* *
* Distributed under the Boost Software License, Version 1.0. (See * Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt) * accompanying file LICENSE.txt)
@@ -30,11 +30,9 @@ using boost::asio::consign;
using boost::asio::detached; using boost::asio::detached;
using boost::asio::dynamic_buffer; using boost::asio::dynamic_buffer;
using boost::asio::redirect_error; using boost::asio::redirect_error;
using boost::asio::use_awaitable;
using boost::redis::config; using boost::redis::config;
using boost::redis::connection; using boost::redis::connection;
using boost::redis::generic_response; using boost::redis::generic_flat_response;
using boost::redis::ignore;
using boost::redis::request; using boost::redis::request;
using boost::system::error_code; using boost::system::error_code;
using namespace std::chrono_literals; using namespace std::chrono_literals;
@@ -47,20 +45,24 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
request req; request req;
req.push("SUBSCRIBE", "channel"); req.push("SUBSCRIBE", "channel");
generic_response resp; generic_flat_response resp;
conn->set_receive_response(resp); conn->set_receive_response(resp);
while (conn->will_reconnect()) { while (conn->will_reconnect()) {
// Subscribe to channels. // Subscribe to channels.
co_await conn->async_exec(req, ignore); co_await conn->async_exec(req);
// Loop reading Redis push messages. // Loop reading Redis push messages.
for (error_code ec;;) { for (error_code ec;;) {
co_await conn->async_receive(redirect_error(use_awaitable, ec)); co_await conn->async_receive2(redirect_error(ec));
if (ec) if (ec)
break; // Connection lost, break so we can reconnect to channels. break; // Connection lost, break so we can reconnect to channels.
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
<< resp.value().at(3).value << std::endl; for (auto const& elem: resp.value().get_view())
std::cout << elem.value << "\n";
std::cout << std::endl;
resp.value().clear(); resp.value().clear();
} }
} }
@@ -74,7 +76,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n"); auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
request req; request req;
req.push("PUBLISH", "channel", msg); req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req, ignore); co_await conn->async_exec(req);
msg.erase(0, n); msg.erase(0, n);
} }
} }

View File

@@ -1,11 +1,10 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
* *
* Distributed under the Boost Software License, Version 1.0. (See * Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt) * accompanying file LICENSE.txt)
*/ */
#include <boost/redis/connection.hpp> #include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/awaitable.hpp> #include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp> #include <boost/asio/co_spawn.hpp>
@@ -22,12 +21,8 @@
namespace asio = boost::asio; namespace asio = boost::asio;
using namespace std::chrono_literals; using namespace std::chrono_literals;
using boost::redis::request; using boost::redis::request;
using boost::redis::generic_response; using boost::redis::generic_flat_response;
using boost::redis::consume_one;
using boost::redis::logger;
using boost::redis::config; using boost::redis::config;
using boost::redis::ignore;
using boost::redis::error;
using boost::system::error_code; using boost::system::error_code;
using boost::redis::connection; using boost::redis::connection;
using asio::signal_set; using asio::signal_set;
@@ -54,30 +49,29 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
request req; request req;
req.push("SUBSCRIBE", "channel"); req.push("SUBSCRIBE", "channel");
generic_response resp; generic_flat_response resp;
conn->set_receive_response(resp); conn->set_receive_response(resp);
// Loop while reconnection is enabled // Loop while reconnection is enabled
while (conn->will_reconnect()) { while (conn->will_reconnect()) {
// Reconnect to the channels. // Reconnect to the channels.
co_await conn->async_exec(req, ignore); co_await conn->async_exec(req);
// Loop reading Redis pushs messages. // Loop to read Redis push messages.
for (error_code ec;;) { for (error_code ec;;) {
// First tries to read any buffered pushes. // Wait for pushes
conn->receive(ec); co_await conn->async_receive2(asio::redirect_error(ec));
if (ec == error::sync_receive_push_failed) {
ec = {};
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
}
if (ec) if (ec)
break; // Connection lost, break so we can reconnect to channels. break; // Connection lost, break so we can reconnect to channels.
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " " // The response must be consumed without suspending the
<< resp.value().at(3).value << std::endl; // coroutine i.e. without the use of async operations.
for (auto const& elem: resp.value().get_view())
std::cout << elem.value << "\n";
consume_one(resp); std::cout << std::endl;
resp.value().clear();
} }
} }
} }

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
* *
* Distributed under the Boost Software License, Version 1.0. (See * Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt) * accompanying file LICENSE.txt)
@@ -12,6 +12,8 @@
#include <boost/redis/resp3/node.hpp> #include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp> #include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp> #include <boost/redis/resp3/type.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/redis/response.hpp>
#include <boost/assert.hpp> #include <boost/assert.hpp>
@@ -176,6 +178,97 @@ public:
} }
}; };
template <>
class general_aggregate<resp3::tree> {
private:
resp3::tree* tree_ = nullptr;
public:
explicit general_aggregate(resp3::tree* c = nullptr)
: tree_(c)
{ }
void on_init() { }
void on_done() { }
template <class String>
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer");
resp3::node tmp;
tmp.data_type = nd.data_type;
tmp.aggregate_size = nd.aggregate_size;
tmp.depth = nd.depth;
tmp.value = std::string{std::cbegin(nd.value), std::cend(nd.value)};
tree_->push_back(std::move(tmp));
}
};
template <>
class general_aggregate<generic_flat_response> {
private:
generic_flat_response* tree_ = nullptr;
public:
explicit general_aggregate(generic_flat_response* c = nullptr)
: tree_(c)
{ }
void on_init() { }
void on_done()
{
BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer");
if (tree_->has_value()) {
tree_->value().notify_done();
}
}
template <class String>
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer");
switch (nd.data_type) {
case resp3::type::blob_error:
case resp3::type::simple_error:
*tree_ = error{
nd.data_type,
std::string{std::cbegin(nd.value), std::cend(nd.value)}
};
break;
default:
if (tree_->has_value()) {
(**tree_).push(nd);
}
}
}
};
template <>
class general_aggregate<resp3::flat_tree> {
private:
resp3::flat_tree* tree_ = nullptr;
public:
explicit general_aggregate(resp3::flat_tree* c = nullptr)
: tree_(c)
{ }
void on_init() { }
void on_done()
{
tree_->notify_done();
}
template <class String>
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!tree_, "Unexpected null pointer");
tree_->push(nd);
}
};
template <class Node> template <class Node>
class general_simple { class general_simple {
private: private:

View File

@@ -92,8 +92,32 @@ struct response_traits<result<ignore_t>> {
}; };
template <class String, class Allocator> template <class String, class Allocator>
struct response_traits<result<std::vector<resp3::basic_node<String>, Allocator>>> { struct response_traits<result<resp3::basic_tree<String, Allocator>>> {
using response_type = result<std::vector<resp3::basic_node<String>, Allocator>>; using response_type = result<resp3::basic_tree<String, Allocator>>;
using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <class String>
struct response_traits<resp3::basic_tree<String>> {
using response_type = resp3::basic_tree<String>;
using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <>
struct response_traits<resp3::flat_tree> {
using response_type = resp3::flat_tree;
using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <>
struct response_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = general_aggregate<response_type>; using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }

View File

@@ -13,6 +13,8 @@
#include <boost/redis/error.hpp> #include <boost/redis/error.hpp>
#include <boost/redis/ignore.hpp> #include <boost/redis/ignore.hpp>
#include <boost/redis/resp3/type.hpp> #include <boost/redis/resp3/type.hpp>
#include <boost/redis/resp3/tree.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/mp11.hpp> #include <boost/mp11.hpp>
@@ -56,12 +58,33 @@ struct result_traits<result<resp3::basic_node<T>>> {
}; };
template <class String, class Allocator> template <class String, class Allocator>
struct result_traits<result<std::vector<resp3::basic_node<String>, Allocator>>> { struct result_traits<result<resp3::basic_tree<String, Allocator>>> {
using response_type = result<std::vector<resp3::basic_node<String>, Allocator>>; using response_type = result<std::vector<resp3::basic_node<String>, Allocator>>;
using adapter_type = adapter::detail::general_aggregate<response_type>; using adapter_type = adapter::detail::general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; } static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
}; };
template <class String>
struct result_traits<resp3::basic_tree<String>> {
using response_type = resp3::basic_tree<String>;
using adapter_type = adapter::detail::general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <>
struct result_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <>
struct result_traits<resp3::flat_tree> {
using response_type = resp3::flat_tree;
using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <class T> template <class T>
using adapter_t = typename result_traits<std::decay_t<T>>::adapter_type; using adapter_t = typename result_traits<std::decay_t<T>>::adapter_type;

View File

@@ -207,6 +207,50 @@ struct connection_impl {
{ {
st_.mpx.set_receive_adapter(std::move(adapter)); st_.mpx.set_receive_adapter(std::move(adapter));
} }
std::size_t receive(system::error_code& ec)
{
std::size_t size = 0;
auto f = [&](system::error_code const& ec2, std::size_t n) {
ec = ec2;
size = n;
};
auto const res = receive_channel_.try_receive(f);
if (ec)
return 0;
if (!res)
ec = error::sync_receive_push_failed;
return size;
}
template <class CompletionToken>
auto async_receive2(CompletionToken&& token)
{
return
receive_channel_.async_receive(
asio::deferred(
[this](system::error_code ec, std::size_t)
{
if (!ec) {
auto f = [](system::error_code, std::size_t) {
// There is no point in checking for errors
// here since async_receive just completed
// without errors.
};
// We just want to drain the channel.
while (receive_channel_.try_receive(f));
}
return asio::deferred.values(ec);
}
)
)(std::forward<CompletionToken>(token));
}
}; };
template <class Executor> template <class Executor>
@@ -593,7 +637,7 @@ public:
return async_run(config{}, std::forward<CompletionToken>(token)); return async_run(config{}, std::forward<CompletionToken>(token));
} }
/** @brief Receives server side pushes asynchronously. /** @brief (Deprecated) Receives server side pushes asynchronously.
* *
* When pushes arrive and there is no `async_receive` operation in * When pushes arrive and there is no `async_receive` operation in
* progress, pushed data, requests, and responses will be paused * progress, pushed data, requests, and responses will be paused
@@ -623,12 +667,57 @@ public:
* @param token Completion token. * @param token Completion token.
*/ */
template <class CompletionToken = asio::default_completion_token_t<executor_type>> template <class CompletionToken = asio::default_completion_token_t<executor_type>>
BOOST_DEPRECATED("Please use async_receive2 instead.")
auto async_receive(CompletionToken&& token = {}) auto async_receive(CompletionToken&& token = {})
{ {
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token)); return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
} }
/** @brief Receives server pushes synchronously without blocking. /** @brief Wait for server pushes asynchronously
*
* This function suspends until a server push is received by the
* connection. On completion an unspecified number of pushes will
* have been added to the response object set with @ref
* boost::redis::connection::set_receive_response.
*
* To prevent receiving an unbound number of pushes the connection
* blocks further read operations on the socket when 256 pushes
* accumulate internally (we don't make any commitment to this
* exact number). When that happens any `async_exec`s and
* health-checks won't make any progress and the connection may
* eventually timeout. To avoid that Apps should call
* `async_receive2` continuously in a loop.
*
* @Note To avoid deadlocks the task (e.g. coroutine) calling
* `async_receive2` should not call `async_exec` in a way where
* they could block each other.
*
* For an example see cpp20_subscriber.cpp. The completion token
* must have the following signature
*
* @code
* void f(system::error_code);
* @endcode
*
* @par Per-operation cancellation
* This operation supports the following cancellation types:
*
* @li `asio::cancellation_type_t::terminal`.
* @li `asio::cancellation_type_t::partial`.
* @li `asio::cancellation_type_t::total`.
*
* Calling `basic_connection::cancel(operation::receive)` will
* also cancel any ongoing receive operations.
*
* @param token Completion token.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_receive2(CompletionToken&& token = {})
{
return impl_->async_receive2(std::forward<CompletionToken>(token));
}
/** @brief (Deprecated) Receives server pushes synchronously without blocking.
* *
* Receives a server push synchronously by calling `try_receive` on * Receives a server push synchronously by calling `try_receive` on
* the underlying channel. If the operation fails because * the underlying channel. If the operation fails because
@@ -638,23 +727,10 @@ public:
* @param ec Contains the error if any occurred. * @param ec Contains the error if any occurred.
* @returns The number of bytes read from the socket. * @returns The number of bytes read from the socket.
*/ */
BOOST_DEPRECATED("Please, use async_receive2 instead.")
std::size_t receive(system::error_code& ec) std::size_t receive(system::error_code& ec)
{ {
std::size_t size = 0; return impl_->receive(ec);
auto f = [&](system::error_code const& ec2, std::size_t n) {
ec = ec2;
size = n;
};
auto const res = impl_->receive_channel_.try_receive(f);
if (ec)
return 0;
if (!res)
ec = error::sync_receive_push_failed;
return size;
} }
/** @brief Executes commands on the Redis server asynchronously. /** @brief Executes commands on the Redis server asynchronously.
@@ -837,7 +913,7 @@ public:
"the other member functions to interact with the connection.") "the other member functions to interact with the connection.")
auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); } auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
/// Sets the response object of @ref async_receive operations. /// Sets the response object of @ref async_receive2 operations.
template <class Response> template <class Response>
void set_receive_response(Response& resp) void set_receive_response(Response& resp)
{ {
@@ -1028,13 +1104,25 @@ public:
/// @copydoc basic_connection::async_receive /// @copydoc basic_connection::async_receive
template <class CompletionToken = asio::deferred_t> template <class CompletionToken = asio::deferred_t>
BOOST_DEPRECATED("Please use async_receive2 instead.")
auto async_receive(CompletionToken&& token = {}) auto async_receive(CompletionToken&& token = {})
{ {
return impl_.async_receive(std::forward<CompletionToken>(token)); return impl_.async_receive(std::forward<CompletionToken>(token));
} }
/// @copydoc basic_connection::async_receive2
template <class CompletionToken = asio::deferred_t>
auto async_receive2(CompletionToken&& token = {})
{
return impl_.async_receive2(std::forward<CompletionToken>(token));
}
/// @copydoc basic_connection::receive /// @copydoc basic_connection::receive
std::size_t receive(system::error_code& ec) { return impl_.receive(ec); } BOOST_DEPRECATED("Please use async_receive2 instead.")
std::size_t receive(system::error_code& ec)
{
return impl_.impl_->receive(ec);
}
/** /**
* @brief Calls @ref boost::redis::basic_connection::async_exec. * @brief Calls @ref boost::redis::basic_connection::async_exec.

View File

@@ -74,7 +74,7 @@ enum class error
/// SSL handshake timeout /// SSL handshake timeout
ssl_handshake_timeout, ssl_handshake_timeout,
/// Can't receive push synchronously without blocking /// (Deprecated) Can't receive push synchronously without blocking
sync_receive_push_failed, sync_receive_push_failed,
/// Incompatible node depth. /// Incompatible node depth.

View File

@@ -0,0 +1,125 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Nikolai Vladimirov (nvladimirov.work@gmail.com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/assert.hpp>
namespace boost::redis::resp3 {
flat_tree::flat_tree(flat_tree const& other)
: data_{other.data_}
, view_tree_{other.view_tree_}
, ranges_{other.ranges_}
, pos_{0u}
, reallocs_{0u}
, total_msgs_{other.total_msgs_}
{
view_tree_.resize(ranges_.size());
set_views();
}
flat_tree&
flat_tree::operator=(flat_tree other)
{
swap(*this, other);
return *this;
}
void flat_tree::reserve(std::size_t bytes, std::size_t nodes)
{
data_.reserve(bytes);
view_tree_.reserve(nodes);
ranges_.reserve(nodes);
}
void flat_tree::clear()
{
pos_ = 0u;
total_msgs_ = 0u;
reallocs_ = 0u;
data_.clear();
view_tree_.clear();
ranges_.clear();
}
void flat_tree::set_views()
{
BOOST_ASSERT_MSG(pos_ < view_tree_.size(), "notify_done called but no nodes added.");
BOOST_ASSERT_MSG(view_tree_.size() == ranges_.size(), "Incompatible sizes.");
for (; pos_ < view_tree_.size(); ++pos_) {
auto const& r = ranges_.at(pos_);
view_tree_.at(pos_).value = std::string_view{data_.data() + r.offset, r.size};
}
}
void flat_tree::notify_done()
{
total_msgs_ += 1;
set_views();
}
void flat_tree::push(node_view const& node)
{
auto data_before = data_.data();
add_node_impl(node);
auto data_after = data_.data();
if (data_after != data_before) {
pos_ = 0;
reallocs_ += 1;
}
}
void flat_tree::add_node_impl(node_view const& node)
{
ranges_.push_back({data_.size(), node.value.size()});
// This must come after setting the offset above.
data_.append(node.value.data(), node.value.size());
view_tree_.push_back(node);
}
void swap(flat_tree& a, flat_tree& b)
{
using std::swap;
swap(a.data_, b.data_);
swap(a.view_tree_, b.view_tree_);
swap(a.ranges_, b.ranges_);
swap(a.pos_, b.pos_);
swap(a.reallocs_, b.reallocs_);
swap(a.total_msgs_, b.total_msgs_);
}
bool
operator==(
flat_tree::range const& a,
flat_tree::range const& b)
{
return a.offset == b.offset && a.size == b.size;
}
bool operator==(flat_tree const& a, flat_tree const& b)
{
return
a.data_ == b.data_ &&
a.view_tree_ == b.view_tree_ &&
a.ranges_ == b.ranges_ &&
a.pos_ == b.pos_ &&
//a.reallocs_ == b.reallocs_ &&
a.total_msgs_ == b.total_msgs_;
}
bool operator!=(flat_tree const& a, flat_tree const& b)
{
return !(a == b);
}
} // namespace boost::redis

View File

@@ -0,0 +1,131 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Nikolai Vladimirov (nvladimirov.work@gmail.com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_REDIS_RESP3_FLAT_TREE_HPP
#define BOOST_REDIS_RESP3_FLAT_TREE_HPP
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/tree.hpp>
#include <string>
#include <vector>
namespace boost::redis {
namespace adapter::detail {
template <class> class general_aggregate;
}
namespace resp3 {
/** @brief A generic-response that stores data contiguously
*
* Similar to the @ref boost::redis::resp3::tree but data is
* stored contiguously.
*/
struct flat_tree {
public:
/// Default constructor
flat_tree() = default;
/// Move constructor
flat_tree(flat_tree&&) noexcept = default;
/// Copy constructor
flat_tree(flat_tree const& other);
/// Copy assignment
flat_tree& operator=(flat_tree other);
friend void swap(flat_tree&, flat_tree&);
friend
bool operator==(flat_tree const&, flat_tree const&);
friend
bool operator!=(flat_tree const&, flat_tree const&);
/** @brief Reserve capacity
*
* Reserve memory for incoming data.
*
* @param bytes Number of bytes to reserve for data.
* @param nodes Number of nodes to reserve.
*/
void reserve(std::size_t bytes, std::size_t nodes);
/** @brief Clear both the data and the node buffers
*
* @Note: A `boost::redis:.flat_tree` can contain the
* response to multiple Redis commands and server pushes. Calling
* this function will erase everything contained in it.
*/
void clear();
/// Returns the size of the data buffer
auto data_size() const noexcept -> std::size_t
{ return data_.size(); }
/// Returns the RESP3 response
auto get_view() const -> view_tree const&
{ return view_tree_; }
/** @brief Returns the number of times reallocation took place
*
* This function returns how many reallocations were performed and
* can be useful to determine how much memory to reserve upfront.
*/
auto get_reallocs() const noexcept -> std::size_t
{ return reallocs_; }
/// Returns the number of complete RESP3 messages contained in this object.
std::size_t get_total_msgs() const noexcept
{ return total_msgs_; }
private:
template <class> friend class adapter::detail::general_aggregate;
// Notify the object that all nodes were pushed.
void notify_done();
// Push a new node to the response
void push(node_view const& node);
void add_node_impl(node_view const& node);
void set_views();
// Range into the data buffer.
struct range {
std::size_t offset;
std::size_t size;
};
friend bool operator==(range const&, range const&);
std::string data_;
view_tree view_tree_;
std::vector<range> ranges_;
std::size_t pos_ = 0u;
std::size_t reallocs_ = 0u;
std::size_t total_msgs_ = 0u;
};
/// Swaps two responses
void swap(flat_tree&, flat_tree&);
/// Equality operator
bool operator==(flat_tree const&, flat_tree const&);
/// Inequality operator
bool operator!=(flat_tree const&, flat_tree const&);
} // resp3
} // namespace boost::redis
#endif // BOOST_REDIS_RESP3_FLAT_TREE_HPP

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
* *
* Distributed under the Boost Software License, Version 1.0. (See * Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt) * accompanying file LICENSE.txt)
@@ -53,6 +53,13 @@ auto operator==(basic_node<String> const& a, basic_node<String> const& b)
// clang-format on // clang-format on
}; };
/// Inequality operator for RESP3 nodes
template <class String>
auto operator!=(basic_node<String> const& a, basic_node<String> const& b)
{
return !(a == b);
};
/// A node in the response tree that owns its data. /// A node in the response tree that owns its data.
using node = basic_node<std::string>; using node = basic_node<std::string>;

View File

@@ -0,0 +1,29 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_RESP3_TREE_HPP
#define BOOST_REDIS_RESP3_TREE_HPP
#include <boost/redis/resp3/node.hpp>
#include <vector>
#include <string_view>
namespace boost::redis::resp3 {
/// A RESP3 tree that owns its data.
template <class String, class Allocator = std::allocator<basic_node<String>>>
using basic_tree = std::vector<basic_node<String>, Allocator>;
/// A RESP3 tree that owns its data.
using tree = basic_tree<std::string>;
/// A RESP3 tree whose data are `std::string_views`.
using view_tree = basic_tree<std::string_view>;
}
#endif // BOOST_REDIS_RESP3_RESPONSE_HPP

View File

@@ -9,6 +9,8 @@
#include <boost/redis/adapter/result.hpp> #include <boost/redis/adapter/result.hpp>
#include <boost/redis/resp3/node.hpp> #include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/tree.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
@@ -29,9 +31,12 @@ using response = std::tuple<adapter::result<Ts>...>;
* [pre-order](https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR) * [pre-order](https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR)
* view of the response tree. * view of the response tree.
*/ */
using generic_response = adapter::result<std::vector<resp3::node>>; using generic_response = adapter::result<resp3::tree>;
/** @brief Consume on response from a generic response /// Similar to @ref boost::redis::generic_response but stores data contiguously.
using generic_flat_response = adapter::result<resp3::flat_tree>;
/** @brief (Deprecated) Consume on response from a generic response
* *
* This function rotates the elements so that the start of the next * This function rotates the elements so that the start of the next
* response becomes the new front element. For example the output of * response becomes the new front element. For example the output of
@@ -70,13 +75,15 @@ using generic_response = adapter::result<std::vector<resp3::node>>;
* @param r The response to modify. * @param r The response to modify.
* @param ec Will be populated in case of error. * @param ec Will be populated in case of error.
*/ */
//BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
void consume_one(generic_response& r, system::error_code& ec); void consume_one(generic_response& r, system::error_code& ec);
/** /**
* @brief Throwing overload of `consume_one`. * @brief (Deprecated) Throwing overload of `consume_one`.
* *
* @param r The response to modify. * @param r The response to modify.
*/ */
//BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
void consume_one(generic_response& r); void consume_one(generic_response& r);
} // namespace boost::redis } // namespace boost::redis

View File

@@ -17,6 +17,7 @@
#include <boost/redis/impl/response.ipp> #include <boost/redis/impl/response.ipp>
#include <boost/redis/impl/run_fsm.ipp> #include <boost/redis/impl/run_fsm.ipp>
#include <boost/redis/impl/writer_fsm.ipp> #include <boost/redis/impl/writer_fsm.ipp>
#include <boost/redis/impl/flat_tree.ipp>
#include <boost/redis/resp3/impl/parser.ipp> #include <boost/redis/resp3/impl/parser.ipp>
#include <boost/redis/resp3/impl/serialization.ipp> #include <boost/redis/resp3/impl/serialization.ipp>
#include <boost/redis/resp3/impl/type.ipp> #include <boost/redis/resp3/impl/type.ipp>

View File

@@ -56,6 +56,7 @@ make_test(test_conn_run_cancel)
make_test(test_conn_check_health) make_test(test_conn_check_health)
make_test(test_conn_exec) make_test(test_conn_exec)
make_test(test_conn_push) make_test(test_conn_push)
make_test(test_conn_push2)
make_test(test_conn_monitor) make_test(test_conn_monitor)
make_test(test_conn_reconnect) make_test(test_conn_reconnect)
make_test(test_conn_exec_cancel) make_test(test_conn_exec_cancel)

View File

@@ -13,6 +13,7 @@
#include <boost/test/included/unit_test.hpp> #include <boost/test/included/unit_test.hpp>
using boost::redis::generic_response; using boost::redis::generic_response;
using boost::redis::resp3::flat_tree;
using boost::redis::response; using boost::redis::response;
using boost::redis::ignore; using boost::redis::ignore;
using boost::redis::any_adapter; using boost::redis::any_adapter;
@@ -24,10 +25,12 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types)
response<int> r1; response<int> r1;
response<int, std::string> r2; response<int, std::string> r2;
generic_response r3; generic_response r3;
flat_tree r4;
BOOST_CHECK_NO_THROW(any_adapter{r1}); BOOST_CHECK_NO_THROW(any_adapter{r1});
BOOST_CHECK_NO_THROW(any_adapter{r2}); BOOST_CHECK_NO_THROW(any_adapter{r2});
BOOST_CHECK_NO_THROW(any_adapter{r3}); BOOST_CHECK_NO_THROW(any_adapter{r3});
BOOST_CHECK_NO_THROW(any_adapter{r4});
BOOST_CHECK_NO_THROW(any_adapter{ignore}); BOOST_CHECK_NO_THROW(any_adapter{ignore});
} }

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
* *
* Distributed under the Boost Software License, Version 1.0. (See * Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt) * accompanying file LICENSE.txt)
@@ -27,6 +27,7 @@ using error_code = boost::system::error_code;
using boost::redis::operation; using boost::redis::operation;
using boost::redis::request; using boost::redis::request;
using boost::redis::response; using boost::redis::response;
using boost::redis::resp3::flat_tree;
using boost::redis::ignore; using boost::redis::ignore;
using boost::redis::ignore_t; using boost::redis::ignore_t;
using boost::redis::logger; using boost::redis::logger;
@@ -54,39 +55,44 @@ std::ostream& operator<<(std::ostream& os, usage const& u)
namespace { namespace {
auto push_consumer(connection& conn, int expected) -> net::awaitable<void> auto
receiver(
connection& conn,
flat_tree& resp,
std::size_t expected) -> net::awaitable<void>
{ {
int c = 0; std::size_t push_counter = 0;
for (error_code ec;;) { while (push_counter != expected) {
conn.receive(ec); co_await conn.async_receive2();
if (ec == error::sync_receive_push_failed) { push_counter += resp.get_total_msgs();
ec = {}; resp.clear();
co_await conn.async_receive(net::redirect_error(ec));
} else if (!ec) {
//std::cout << "Skipping suspension." << std::endl;
}
if (ec) {
BOOST_TEST(false, "push_consumer error: " << ec.message());
co_return;
}
if (++c == expected)
break;
} }
conn.cancel(); conn.cancel();
} }
auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable<void> auto echo_session(connection& conn, const request& req, std::size_t n) -> net::awaitable<void>
{ {
for (auto i = 0; i < n; ++i) for (auto i = 0u; i < n; ++i)
co_await conn.async_exec(pubs); co_await conn.async_exec(req);
} }
void rethrow_on_error(std::exception_ptr exc) void rethrow_on_error(std::exception_ptr exc)
{ {
if (exc) if (exc) {
BOOST_TEST(false);
std::rethrow_exception(exc); std::rethrow_exception(exc);
}
}
request make_pub_req(std::size_t n_pubs)
{
request req;
req.push("PING");
for (std::size_t i = 0u; i < n_pubs; ++i)
req.push("PUBLISH", "channel", "payload");
return req;
} }
BOOST_AUTO_TEST_CASE(echo_stress) BOOST_AUTO_TEST_CASE(echo_stress)
@@ -98,22 +104,22 @@ BOOST_AUTO_TEST_CASE(echo_stress)
// Number of coroutines that will send pings sharing the same // Number of coroutines that will send pings sharing the same
// connection to redis. // connection to redis.
constexpr int sessions = 150; constexpr std::size_t sessions = 150u;
// The number of pings that will be sent by each session. // The number of pings that will be sent by each session.
constexpr int msgs = 200; constexpr std::size_t msgs = 200u;
// The number of publishes that will be sent by each session with // The number of publishes that will be sent by each session with
// each message. // each message.
constexpr int n_pubs = 25; constexpr std::size_t n_pubs = 25u;
// This is the total number of pushes we will receive. // This is the total number of pushes we will receive.
constexpr int total_pushes = sessions * msgs * n_pubs + 1; constexpr std::size_t total_pushes = sessions * msgs * n_pubs + 1;
request pubs; flat_tree resp;
pubs.push("PING"); conn.set_receive_response(resp);
for (int i = 0; i < n_pubs; ++i)
pubs.push("PUBLISH", "channel", "payload"); request const pub_req = make_pub_req(n_pubs);
// Run the connection // Run the connection
bool run_finished = false, subscribe_finished = false; bool run_finished = false, subscribe_finished = false;
@@ -123,6 +129,10 @@ BOOST_AUTO_TEST_CASE(echo_stress)
std::clog << "async_run finished" << std::endl; std::clog << "async_run finished" << std::endl;
}); });
// Op that will consume the pushes counting down until all expected
// pushes have been received.
net::co_spawn(ctx, receiver(conn, resp, total_pushes), rethrow_on_error);
// Subscribe, then launch the coroutines // Subscribe, then launch the coroutines
request req; request req;
req.push("SUBSCRIBE", "channel"); req.push("SUBSCRIBE", "channel");
@@ -130,12 +140,8 @@ BOOST_AUTO_TEST_CASE(echo_stress)
subscribe_finished = true; subscribe_finished = true;
BOOST_TEST(ec == error_code()); BOOST_TEST(ec == error_code());
// Op that will consume the pushes counting down until all expected for (std::size_t i = 0; i < sessions; ++i)
// pushes have been received. net::co_spawn(ctx, echo_session(conn, pub_req, msgs), rethrow_on_error);
net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error);
for (int i = 0; i < sessions; ++i)
net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error);
}); });
// Run the test // Run the test
@@ -144,7 +150,13 @@ BOOST_AUTO_TEST_CASE(echo_stress)
BOOST_TEST(subscribe_finished); BOOST_TEST(subscribe_finished);
// Print statistics // Print statistics
std::cout << "-------------------\n" << conn.get_usage() << std::endl; std::cout
<< "-------------------\n"
<< "Usage data: \n"
<< conn.get_usage() << "\n"
<< "-------------------\n"
<< "Reallocations: " << resp.get_reallocs()
<< std::endl;
} }
} // namespace } // namespace

View File

@@ -269,9 +269,9 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
generic_response gresp; generic_response gresp;
conn->set_receive_response(gresp); conn->set_receive_response(gresp);
auto c3 = [&](error_code ec, std::size_t) { auto c3 = [&](error_code ec) {
c3_called = true; c3_called = true;
std::cout << "async_receive" << std::endl; std::cout << "async_receive2" << std::endl;
BOOST_TEST(!ec); BOOST_TEST(!ec);
BOOST_TEST(gresp.has_error()); BOOST_TEST(gresp.has_error());
BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error); BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error);
@@ -281,7 +281,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
conn->cancel(operation::reconnection); conn->cancel(operation::reconnection);
}; };
conn->async_receive(c3); conn->async_receive2(c3);
run(conn); run(conn);
@@ -326,4 +326,4 @@ BOOST_AUTO_TEST_CASE(issue_287_generic_response_error_then_success)
BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command"); BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command");
} }
} // namespace } // namespace

View File

@@ -41,7 +41,7 @@ class test_monitor {
void start_receive() void start_receive()
{ {
conn.async_receive([this](error_code ec, std::size_t) { conn.async_receive2([this](error_code ec) {
// We should expect one push entry, at least // We should expect one push entry, at least
BOOST_TEST_EQ(ec, error_code()); BOOST_TEST_EQ(ec, error_code());
BOOST_TEST(monitor_resp.has_value()); BOOST_TEST(monitor_resp.has_value());
@@ -118,4 +118,4 @@ int main()
test_monitor{}.run(); test_monitor{}.run();
return boost::report_errors(); return boost::report_errors();
} }

392
test/test_conn_push2.cpp Normal file
View File

@@ -0,0 +1,392 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/system/errc.hpp>
#include <string>
#define BOOST_TEST_MODULE conn_push
#include <boost/test/included/unit_test.hpp>
#include "common.hpp"
#include <cstddef>
#include <iostream>
namespace net = boost::asio;
namespace redis = boost::redis;
using boost::redis::operation;
using boost::redis::connection;
using boost::system::error_code;
using boost::redis::request;
using boost::redis::response;
using boost::redis::resp3::flat_tree;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::system::error_code;
using boost::redis::logger;
using namespace std::chrono_literals;
namespace {
BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
{
request req1;
req1.push("HELLO", 3);
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("PING", "Message2");
req3.push("QUIT");
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
bool push_received = false, c1_called = false, c2_called = false, c3_called = false;
auto c3 = [&](error_code ec, std::size_t) {
c3_called = true;
std::cout << "c3: " << ec.message() << std::endl;
};
auto c2 = [&, conn](error_code ec, std::size_t) {
c2_called = true;
BOOST_TEST(ec == error_code());
conn->async_exec(req3, ignore, c3);
};
auto c1 = [&, conn](error_code ec, std::size_t) {
c1_called = true;
BOOST_TEST(ec == error_code());
conn->async_exec(req2, ignore, c2);
};
conn->async_exec(req1, ignore, c1);
run(conn, make_test_config(), {});
conn->async_receive2([&, conn](error_code ec) {
std::cout << "async_receive2" << std::endl;
BOOST_TEST(ec == error_code());
push_received = true;
conn->cancel();
});
ioc.run_for(test_timeout);
BOOST_TEST(push_received);
BOOST_TEST(c1_called);
BOOST_TEST(c2_called);
BOOST_TEST(c3_called);
}
BOOST_AUTO_TEST_CASE(push_received1)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
flat_tree resp;
conn->set_receive_response(resp);
// Trick: Uses SUBSCRIBE because this command has no response or
// better said, its response is a server push, which is what we
// want to test.
request req;
req.push("SUBSCRIBE", "channel1");
req.push("SUBSCRIBE", "channel2");
bool push_received = false, exec_finished = false;
conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) {
exec_finished = true;
std::cout << "async_exec" << std::endl;
BOOST_TEST(ec == error_code());
});
conn->async_receive2([&, conn](error_code ec) {
push_received = true;
std::cout << "async_receive2" << std::endl;
BOOST_TEST(ec == error_code());
BOOST_CHECK_EQUAL(resp.get_total_msgs(), 2u);
conn->cancel();
});
run(conn);
ioc.run_for(test_timeout);
BOOST_TEST(exec_finished);
BOOST_TEST(push_received);
}
BOOST_AUTO_TEST_CASE(push_filtered_out)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
request req;
req.push("HELLO", 3);
req.push("PING");
req.push("SUBSCRIBE", "channel");
req.push("QUIT");
response<ignore_t, std::string, std::string> resp;
bool exec_finished = false, push_received = false;
conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) {
exec_finished = true;
BOOST_TEST(ec == error_code());
});
conn->async_receive2([&, conn](error_code ec) {
push_received = true;
BOOST_TEST(ec == error_code());
conn->cancel(operation::reconnection);
});
run(conn);
ioc.run_for(test_timeout);
BOOST_TEST(exec_finished);
BOOST_TEST(push_received);
BOOST_CHECK_EQUAL(std::get<1>(resp).value(), "PONG");
BOOST_CHECK_EQUAL(std::get<2>(resp).value(), "OK");
}
struct response_error_tag { };
response_error_tag error_tag_obj;
struct response_error_adapter {
void on_init() { }
void on_done() { }
void on_node(
boost::redis::resp3::basic_node<std::string_view> const&,
boost::system::error_code& ec)
{
ec = boost::redis::error::incompatible_size;
}
};
auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; }
BOOST_AUTO_TEST_CASE(test_push_adapter)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
request req;
req.push("HELLO", 3);
req.push("PING");
req.push("SUBSCRIBE", "channel");
req.push("PING");
conn->set_receive_response(error_tag_obj);
bool push_received = false, exec_finished = false, run_finished = false;
conn->async_receive2([&, conn](error_code ec) {
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled);
push_received = true;
});
conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) {
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
exec_finished = true;
});
auto cfg = make_test_config();
cfg.reconnect_wait_interval = 0s;
conn->async_run(cfg, [&run_finished](error_code ec) {
BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size);
run_finished = true;
});
ioc.run_for(test_timeout);
BOOST_TEST(push_received);
BOOST_TEST(exec_finished);
BOOST_TEST(run_finished);
// TODO: Reset the ioc reconnect and send a quit to ensure
// reconnection is possible after an error.
}
void launch_push_consumer(std::shared_ptr<connection> conn)
{
conn->async_receive2([conn](error_code ec) {
if (ec) {
BOOST_TEST(ec == net::experimental::error::channel_cancelled);
return;
}
launch_push_consumer(conn);
});
}
BOOST_AUTO_TEST_CASE(many_subscribers)
{
request req0;
req0.get_config().cancel_on_connection_lost = false;
req0.push("HELLO", 3);
request req1;
req1.get_config().cancel_on_connection_lost = false;
req1.push("PING", "Message1");
request req2;
req2.get_config().cancel_on_connection_lost = false;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.get_config().cancel_on_connection_lost = false;
req3.push("QUIT");
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
bool finished = false;
auto c11 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->cancel(operation::reconnection);
finished = true;
};
auto c10 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req3, ignore, c11);
};
auto c9 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req2, ignore, c10);
};
auto c8 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req1, ignore, c9);
};
auto c7 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req2, ignore, c8);
};
auto c6 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req2, ignore, c7);
};
auto c5 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req1, ignore, c6);
};
auto c4 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req2, ignore, c5);
};
auto c3 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req1, ignore, c4);
};
auto c2 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req2, ignore, c3);
};
auto c1 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req2, ignore, c2);
};
auto c0 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req1, ignore, c1);
};
conn->async_exec(req0, ignore, c0);
launch_push_consumer(conn);
run(conn, make_test_config(), {});
ioc.run_for(test_timeout);
BOOST_TEST(finished);
}
BOOST_AUTO_TEST_CASE(test_unsubscribe)
{
net::io_context ioc;
connection conn{ioc};
// Subscribe to 3 channels and 2 patterns. Use CLIENT INFO to verify this took effect
request req_subscribe;
req_subscribe.push("SUBSCRIBE", "ch1", "ch2", "ch3");
req_subscribe.push("PSUBSCRIBE", "ch1*", "ch2*");
req_subscribe.push("CLIENT", "INFO");
// Then, unsubscribe from some of them, and verify again
request req_unsubscribe;
req_unsubscribe.push("UNSUBSCRIBE", "ch1");
req_unsubscribe.push("PUNSUBSCRIBE", "ch2*");
req_unsubscribe.push("CLIENT", "INFO");
// Finally, ping to verify that the connection is still usable
request req_ping;
req_ping.push("PING", "test_unsubscribe");
response<std::string> resp_subscribe, resp_unsubscribe, resp_ping;
bool subscribe_finished = false, unsubscribe_finished = false, ping_finished = false,
run_finished = false;
auto on_ping = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
ping_finished = true;
BOOST_TEST(std::get<0>(resp_ping).has_value());
BOOST_TEST(std::get<0>(resp_ping).value() == "test_unsubscribe");
conn.cancel();
};
auto on_unsubscribe = [&](error_code ec, std::size_t) {
unsubscribe_finished = true;
BOOST_TEST(ec == error_code());
BOOST_TEST(std::get<0>(resp_unsubscribe).has_value());
BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub") == "2");
BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub") == "1");
conn.async_exec(req_ping, resp_ping, on_ping);
};
auto on_subscribe = [&](error_code ec, std::size_t) {
subscribe_finished = true;
BOOST_TEST(ec == error_code());
BOOST_TEST(std::get<0>(resp_subscribe).has_value());
BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "sub") == "3");
BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "psub") == "2");
conn.async_exec(req_unsubscribe, resp_unsubscribe, on_unsubscribe);
};
conn.async_exec(req_subscribe, resp_subscribe, on_subscribe);
conn.async_run(make_test_config(), [&run_finished](error_code ec) {
BOOST_TEST(ec == net::error::operation_aborted);
run_finished = true;
});
ioc.run_for(test_timeout);
BOOST_TEST(subscribe_finished);
BOOST_TEST(unsubscribe_finished);
BOOST_TEST(ping_finished);
BOOST_TEST(run_finished);
}
} // namespace

View File

@@ -42,16 +42,15 @@ namespace {
// Push consumer // Push consumer
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void> auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{ {
std::cout << "uuu" << std::endl; std::cout << "Entering receiver" << std::endl;
while (conn->will_reconnect()) { while (conn->will_reconnect()) {
std::cout << "dddd" << std::endl; std::cout << "Reconnect loop" << std::endl;
// Loop reading Redis pushs messages. // Loop reading Redis pushes.
for (;;) { for (error_code ec;;) {
std::cout << "aaaa" << std::endl; std::cout << "Receive loop" << std::endl;
error_code ec; co_await conn->async_receive2(net::redirect_error(ec));
co_await conn->async_receive(net::redirect_error(ec));
if (ec) { if (ec) {
std::cout << "Error in async_receive" << std::endl; std::cout << "Error in async_receive2" << std::endl;
break; break;
} }
} }

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) /* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
* *
* Distributed under the Boost Software License, Version 1.0. (See * Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt) * accompanying file LICENSE.txt)
@@ -22,15 +22,20 @@
using boost::redis::request; using boost::redis::request;
using boost::redis::adapter::adapt2; using boost::redis::adapter::adapt2;
using boost::redis::adapter::result; using boost::redis::adapter::result;
using boost::redis::generic_response; using boost::redis::resp3::tree;
using boost::redis::resp3::flat_tree;
using boost::redis::generic_flat_response;
using boost::redis::ignore_t; using boost::redis::ignore_t;
using boost::redis::resp3::detail::deserialize; using boost::redis::resp3::detail::deserialize;
using boost::redis::resp3::node; using boost::redis::resp3::node;
using boost::redis::resp3::node_view;
using boost::redis::resp3::to_string; using boost::redis::resp3::to_string;
using boost::redis::response; using boost::redis::response;
using boost::redis::any_adapter; using boost::redis::any_adapter;
using boost::system::error_code; using boost::system::error_code;
namespace resp3 = boost::redis::resp3;
#define RESP3_SET_PART1 "~6\r\n+orange\r" #define RESP3_SET_PART1 "~6\r\n+orange\r"
#define RESP3_SET_PART2 "\n+apple\r\n+one" #define RESP3_SET_PART2 "\n+apple\r\n+one"
#define RESP3_SET_PART3 "\r\n+two\r" #define RESP3_SET_PART3 "\r\n+two\r"
@@ -42,7 +47,9 @@ BOOST_AUTO_TEST_CASE(low_level_sync_sans_io)
try { try {
result<std::set<std::string>> resp; result<std::set<std::string>> resp;
deserialize(resp3_set, adapt2(resp)); error_code ec;
deserialize(resp3_set, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
for (auto const& e : resp.value()) for (auto const& e : resp.value())
std::cout << e << std::endl; std::cout << e << std::endl;
@@ -65,7 +72,9 @@ BOOST_AUTO_TEST_CASE(issue_210_empty_set)
char const* wire = "*4\r\n:1\r\n~0\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; char const* wire = "*4\r\n:1\r\n~0\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n";
deserialize(wire, adapt2(resp)); error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
BOOST_CHECK(std::get<1>(resp.value()).value().empty()); BOOST_CHECK(std::get<1>(resp.value()).value().empty());
@@ -91,7 +100,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_one)
char const* char const*
wire = "*4\r\n:1\r\n~1\r\n$3\r\nfoo\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; wire = "*4\r\n:1\r\n~1\r\n$3\r\nfoo\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n";
deserialize(wire, adapt2(resp)); error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().size(), 1u); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().size(), 1u);
@@ -118,7 +129,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_two)
char const* wire = char const* wire =
"*4\r\n:1\r\n~2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n"; "*4\r\n:1\r\n~2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n";
deserialize(wire, adapt2(resp)); error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().at(0), std::string{"foo"}); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().at(0), std::string{"foo"});
@@ -140,7 +153,9 @@ BOOST_AUTO_TEST_CASE(issue_210_no_nested)
char const* char const*
wire = "*4\r\n:1\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n"; wire = "*4\r\n:1\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n";
deserialize(wire, adapt2(resp)); error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1); BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value(), std::string{"foo"}); BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value(), std::string{"foo"});
@@ -159,7 +174,10 @@ BOOST_AUTO_TEST_CASE(issue_233_array_with_null)
result<std::vector<std::optional<std::string>>> resp; result<std::vector<std::optional<std::string>>> resp;
char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n"; char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n";
deserialize(wire, adapt2(resp));
error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(resp.value().at(0).value(), "one"); BOOST_CHECK_EQUAL(resp.value().at(0).value(), "one");
BOOST_TEST(!resp.value().at(1).has_value()); BOOST_TEST(!resp.value().at(1).has_value());
@@ -177,7 +195,10 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null)
result<std::optional<std::vector<std::optional<std::string>>>> resp; result<std::optional<std::vector<std::optional<std::string>>>> resp;
char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n"; char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n";
deserialize(wire, adapt2(resp));
error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(resp.value().value().at(0).value(), "one"); BOOST_CHECK_EQUAL(resp.value().value().at(0).value(), "one");
BOOST_TEST(!resp.value().value().at(1).has_value()); BOOST_TEST(!resp.value().value().at(1).has_value());
@@ -313,3 +334,172 @@ BOOST_AUTO_TEST_CASE(check_counter_adapter)
BOOST_CHECK_EQUAL(node, 7); BOOST_CHECK_EQUAL(node, 7);
BOOST_CHECK_EQUAL(done, 1); BOOST_CHECK_EQUAL(done, 1);
} }
namespace boost::redis::resp3 {
template <class String>
std::ostream& operator<<(std::ostream& os, basic_node<String> const& nd)
{
os << "type: " << to_string(nd.data_type) << "\n"
<< "aggregate_size: " << nd.aggregate_size << "\n"
<< "depth: " << nd.depth << "\n"
<< "value: " << nd.value << "\n";
return os;
}
template <class String>
std::ostream& operator<<(std::ostream& os, basic_tree<String> const& resp)
{
for (auto const& e: resp)
os << e << ",";
return os;
}
}
node from_node_view(node_view const& v)
{
node ret;
ret.data_type = v.data_type;
ret.aggregate_size = v.aggregate_size;
ret.depth = v.depth;
ret.value = v.value;
return ret;
}
tree from_flat(flat_tree const& resp)
{
tree ret;
for (auto const& e: resp.get_view())
ret.push_back(from_node_view(e));
return ret;
}
tree from_flat(generic_flat_response const& resp)
{
tree ret;
for (auto const& e: resp.value().get_view())
ret.push_back(from_node_view(e));
return ret;
}
// Parses the same data into a tree and a
// flat_tree, they should be equal to each other.
BOOST_AUTO_TEST_CASE(flat_tree_views_are_set)
{
tree resp1;
flat_tree resp2;
generic_flat_response resp3;
error_code ec;
deserialize(resp3_set, adapt2(resp1), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
deserialize(resp3_set, adapt2(resp2), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
deserialize(resp3_set, adapt2(resp3), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(resp2.get_reallocs(), 1u);
BOOST_CHECK_EQUAL(resp2.get_total_msgs(), 1u);
BOOST_CHECK_EQUAL(resp3.value().get_reallocs(), 1u);
BOOST_CHECK_EQUAL(resp3.value().get_total_msgs(), 1u);
auto const tmp2 = from_flat(resp2);
BOOST_CHECK_EQUAL(resp1, tmp2);
auto const tmp3 = from_flat(resp3);
BOOST_CHECK_EQUAL(resp1, tmp3);
}
// The response should be reusable.
BOOST_AUTO_TEST_CASE(flat_tree_reuse)
{
flat_tree tmp;
// First use
error_code ec;
deserialize(resp3_set, adapt2(tmp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_CHECK_EQUAL(tmp.get_reallocs(), 1u);
BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u);
// Copy to compare after the reuse.
auto const resp1 = tmp.get_view();
tmp.clear();
// Second use
deserialize(resp3_set, adapt2(tmp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
// No reallocation this time
BOOST_CHECK_EQUAL(tmp.get_reallocs(), 0u);
BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u);
BOOST_CHECK_EQUAL(resp1, tmp.get_view());
}
BOOST_AUTO_TEST_CASE(flat_tree_copy_assign)
{
flat_tree resp;
error_code ec;
deserialize(resp3_set, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
// Copy
resp3::flat_tree copy1{resp};
// Copy assignment
resp3::flat_tree copy2 = resp;
// Assignment
resp3::flat_tree copy3;
copy3 = resp;
BOOST_TEST((copy1 == resp));
BOOST_TEST((copy2 == resp));
BOOST_TEST((copy3 == resp));
}
BOOST_AUTO_TEST_CASE(generic_flat_response_simple_error)
{
generic_flat_response resp;
char const* wire = "-Error\r\n";
error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_TEST(!resp.has_value());
BOOST_TEST(resp.has_error());
auto const error = resp.error();
BOOST_CHECK_EQUAL(error.data_type, boost::redis::resp3::type::simple_error);
BOOST_CHECK_EQUAL(error.diagnostic, std::string{"Error"});
}
BOOST_AUTO_TEST_CASE(generic_flat_response_blob_error)
{
generic_flat_response resp;
char const* wire = "!5\r\nError\r\n";
error_code ec;
deserialize(wire, adapt2(resp), ec);
BOOST_CHECK_EQUAL(ec, error_code{});
BOOST_TEST(!resp.has_value());
BOOST_TEST(resp.has_error());
auto const error = resp.error();
BOOST_CHECK_EQUAL(error.data_type, boost::redis::resp3::type::blob_error);
BOOST_CHECK_EQUAL(error.diagnostic, std::string{"Error"});
}