2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-20 17:12:09 +00:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Marcelo Zimbres
a498f041af Adds async_receive2 function to replace async_receive. 2025-10-25 14:26:45 +02:00
19 changed files with 221 additions and 450 deletions

View File

@@ -106,19 +106,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis pushes.
for (;;) {
error_code ec;
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// Use the response resp in some way and then clear it.
...
consume_one(resp);
resp.value().clear();
}
}
}
@@ -126,4 +125,4 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
## Further reading
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).

View File

@@ -7,146 +7,6 @@
= Changelog
== Boost 1.90
* (Pull request https://github.com/boostorg/redis/pull/310[310])
Improves the per-operation support in `async_exec()`. Requests can now
be cancelled at any point, and cancellations don't interfere with other
requests anyhow. In previous versions, a cancellation could cause
`async_run()` to be cancelled, making cancellations unpredictable.
* (Issue https://github.com/boostorg/redis/issues/226[226])
Added support for the `asio::cancel_after` and `asio::cancel_at`
completion tokens in `async_exec()` and `async_run()`.
* (Issue https://github.com/boostorg/redis/issues/319[319])
Added support for per-operation cancellation in `async_run()`.
* (Pull request https://github.com/boostorg/redis/pull/329[329]
and https://github.com/boostorg/redis/pull/334[334])
The `cancel_on_connection_lost` and `cancel_if_not_connected`
flags in `request::config` have been deprecated, and will be removed
in subsequent releases. To limit the time span that `async_exec`
might take, use `asio::cancel_after`, instead.
`cancel_on_connection_lost` default has been changed to `false`.
This shouldn't cause much impact, since `cancel_on_connection_lost`
is not a reliable way to limit the time span that `async_exec` might take.
* (Pull request https://github.com/boostorg/redis/pull/321[321])
Calling `cancel` with `operation::resolve`, `operation::connect`,
`operation::ssl_handshake`, `operation::reconnection` and
`operation::health_check` is now deprecated.
These enumerators will be removed in subsequent releases.
Users should employ `cancel(operation::run)`, instead.
* (Issue github.com/boostorg/redis/issues/302[302] and
pull request https://github.com/boostorg/redis/pull/303[303])
Added support for custom setup requests using `config::setup`
and `config::use_setup`. When setting these fields, users can
replace the library-generated `HELLO` request by any other arbitrary request.
This request is executed every time a new physical connection with the server
is established. This feature can be used to interact with systems that don't
support `HELLO`, to handle authentication and to connect to replicas.
* (Pull request https://github.com/boostorg/redis/pull/305[305])
`request::config::hello_with_priority` and `request::has_hello_priority()` have
been deprecated and will be removed in subsequent releases.
This flag is not well specified and should only be used by the library.
If you need to execute a request before any other, use `config::setup`, instead.
* (Issue https://github.com/boostorg/redis/issues/296[296])
Valkey long-term support: we guarantee Valkey compatibility
starting with this release. Previous releases may also work,
but have not been tested with this database system.
* (Issue https://github.com/boostorg/redis/issues/341[341])
Adds a `request::append()` function, to concatenate request objects.
* (Issue https://github.com/boostorg/redis/issues/104[104])
The health checker algorithm has been redesigned to avoid
false positives under heavy loads. `PING` commands are now
only issued when the connection is idle, instead of periodically.
* (Pull request https://github.com/boostorg/redis/pull/283[283])
Added `config::read_buffer_append_size`, which allows to control
the expansion of the connection's read buffer.
* (Pull request https://github.com/boostorg/redis/pull/311[311])
Added `usage::bytes_rotated`, which measures data copying when
reading and parsing data from the server.
* (Issue https://github.com/boostorg/redis/issues/298[298])
Added support for authenticating users with an empty password
but a non-default username.
* (Issue https://github.com/boostorg/redis/issues/318[318])
Fixed a number of race conditions in the `cancel()` function
of `connection` and `basic_connection` that could cause
cancellations to be ignored.
* (Issue https://github.com/boostorg/redis/issues/290[290])
Fixed a problem that could cause an error during `HELLO`
to make subsequent `HELLO` attempts during reconnection to fail.
* (Issue https://github.com/boostorg/redis/issues/297[297])
Errors during `HELLO` are now correctly logged.
* (Issue https://github.com/boostorg/redis/issues/287[287])
Fixed a bug causing an exception to be thrown when parsing
a response that contains an intermediate error into a `generic_response`.
== Boost 1.89
* (Pull request https://github.com/boostorg/redis/pull/256[256],
https://github.com/boostorg/redis/pull/266[266] and
https://github.com/boostorg/redis/pull/273[273])
The following members in `connection` and `basic_connection` are now deprecated
and will be removed in subsequent releases:
* `next_layer()` and `next_layer_type`: there is no reason to access the underlying stream object directly.
Connection member functions should be used, instead.
* `get_ssl_context()`: SSL contexts should never be modified after an `asio::ssl::stream`
object has been created from them. Properties should be set before passing the context
to the constructor. There is no reason to access the SSL context after that.
* `reset_stream()`: connection internals have been refactored to reset the SSL stream
automatically when required. This function is now a no-op.
* The `async_run()` overload taking no parameters: use the `async_run`
overload taking a `config` object explicitly, instead.
* (Issue https://github.com/boostorg/redis/issues/213[213])
The logging interface has been re-written:
* Logging can now be customized by passing a function object
to the `logger` constructor. This allows integration with
third-party logging libraries, like spdlog.
This new logging interface is public and will be maintained long-term.
* The old, unstable interface consisting of `logger::on_xxx` functions has been removed.
* `connection` and `basic_connection` constructors now accept a `logger` object.
This is now the preferred way to configure logging.
The `async_run()` overload taking a `logger` object is now deprecated, and will
be removed in subsequent releases.
* `config::log_prefix` is now deprecated, and will
be removed in subsequent releases. Users can achieve the same effect
by passing a custom logging function to the `logger` constructor.
* The default logging function, which prints to `stderr`,
is now based on `printf` and is thus thread-safe.
The old function used `std::cerr` and could result to interleaved
output in multi-threaded programs.
* The default log level is now `logger::level::info`,
down from `logger::level::debug`. This results in less verbose output by default.
* (Issue https://github.com/boostorg/redis/issues/272[272])
Added support for connecting to Redis using UNIX domain sockets.
This feature can be accessed using `config::unix_socket`.
* (Issue https://github.com/boostorg/redis/issues/255[255])
Fixed an issue that caused `async_run` to complete when a connection
establishment error is encountered, even if `config::reconnect_wait_interval`
specified reconnection.
* (Issue https://github.com/boostorg/redis/issues/265[265])
`connection::async_exec` now uses `ignore` as the default response,
rather than having no default response. This matches
`basic_connection::async_exec` behavior.
* (Issue https://github.com/boostorg/redis/issues/260[260])
Fixed a memory corruption affecting the logger's prefix
configured in `config::log_prefix`.
* (Pull request https://github.com/boostorg/redis/pull/254[254])
Fixed some warnings regarding unused variables, name shadowing
and narrowing conversions.
* (Issue https://github.com/boostorg/redis/issues/252[252])
Fixed a bug that causes reconnection to ignore the reconnection
wait time configured in `config::reconnect_wait_interval`.
* (Issue https://github.com/boostorg/redis/issues/238[238])
Fixed a bug introduced in Boost 1.88 that caused `response<T>` to
not compile for some integral types.
* (Issue https://github.com/boostorg/redis/issues/247[247])
The documentation has been modernized, and a more complete
reference section has been added.
* Part of the internals have been refactored to allow for better
testing and reduce compile times.
== Boost 1.88
* (Issue https://github.com/boostorg/redis/issues/233[233])

View File

@@ -117,19 +117,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis pushes.
for (;;) {
error_code ec;
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// Use the response resp in some way and then clear it.
// Use the response here and then clear it.
...
consume_one(resp);
resp.value().clear();
}
}
}

View File

@@ -30,11 +30,9 @@ using boost::asio::consign;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::redirect_error;
using boost::asio::use_awaitable;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::request;
using boost::system::error_code;
using namespace std::chrono_literals;
@@ -52,11 +50,11 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis push messages.
for (error_code ec;;) {
co_await conn->async_receive(redirect_error(use_awaitable, ec));
co_await conn->async_receive2(redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
@@ -74,7 +72,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
msg.erase(0, n);
}
}

View File

@@ -1,11 +1,10 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
@@ -23,11 +22,7 @@ namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::consume_one;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::ignore;
using boost::redis::error;
using boost::system::error_code;
using boost::redis::connection;
using asio::signal_set;
@@ -60,24 +55,23 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
// Loop while reconnection is enabled
while (conn->will_reconnect()) {
// Reconnect to the channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis pushs messages.
// Loop to read Redis push messages.
for (error_code ec;;) {
// First tries to read any buffered pushes.
conn->receive(ec);
if (ec == error::sync_receive_push_failed) {
ec = {};
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
}
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
<< resp.value().at(3).value << std::endl;
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem: resp.value().get_view())
std::cout << elem.value.data << "\n";
consume_one(resp);
std::cout << std::endl;
resp.value().clear();
}
}
}

View File

@@ -593,7 +593,7 @@ public:
return async_run(config{}, std::forward<CompletionToken>(token));
}
/** @brief Receives server side pushes asynchronously.
/** @brief (Deprecated) Receives server side pushes asynchronously.
*
* When pushes arrive and there is no `async_receive` operation in
* progress, pushed data, requests, and responses will be paused
@@ -623,12 +623,76 @@ public:
* @param token Completion token.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
BOOST_DEPRECATED("Please use async_receive2 instead.")
auto async_receive(CompletionToken&& token = {})
{
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
}
/** @brief Receives server pushes synchronously without blocking.
/** @brief Wait for server pushes asynchronously
*
* This function suspends until a server push is received by the
* connection. On completion an unspecified number of pushes will
* have been added to the response object set with @ref
* boost::redis::connection::set_receive_response.
*
* To prevent receiving an unbound number of pushes the connection
* blocks further read operations on the socket when 256 pushes
* accumulate internally (we don't make any commitment to this
* exact number). When that happens ongoing `async_exec`s and
* health-checks won't make any progress and the connection will
* eventually timeout. To avoid that Apps should call
* `async_receive2` continuously in a loop.
*
* @Note To avoid deadlocks the task (e.g. coroutine) calling
* `async_receive2` should not call `async_exec` in a way where
* they could block each other.
*
* For an example see cpp20_subscriber.cpp. The completion token
* must have the following signature
*
* @code
* void f(system::error_code);
* @endcode
*
* @par Per-operation cancellation
* This operation supports the following cancellation types:
*
* @li `asio::cancellation_type_t::terminal`.
* @li `asio::cancellation_type_t::partial`.
* @li `asio::cancellation_type_t::total`.
*
* Calling `basic_connection::cancel(operation::receive)` will
* also cancel any ongoing receive operations.
*
* @param token Completion token.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_receive2(CompletionToken&& token = {})
{
return
impl_->receive_channel_.async_receive(
asio::deferred(
[&conn = *this](system::error_code ec, std::size_t)
{
if (!ec) {
auto f = [](system::error_code, std::size_t) {
// There is no point in checking for errors
// here since async_receive just completed
// without errors.
};
// We just want to drain the channel.
while (conn.impl_->receive_channel_.try_receive(f));
}
return asio::deferred.values(ec);
}
)
)(std::forward<CompletionToken>(token));
}
/** @brief (Deprecated) Receives server pushes synchronously without blocking.
*
* Receives a server push synchronously by calling `try_receive` on
* the underlying channel. If the operation fails because
@@ -638,6 +702,7 @@ public:
* @param ec Contains the error if any occurred.
* @returns The number of bytes read from the socket.
*/
BOOST_DEPRECATED("Please, use async_receive2 instead.")
std::size_t receive(system::error_code& ec)
{
std::size_t size = 0;
@@ -837,7 +902,7 @@ public:
"the other member functions to interact with the connection.")
auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
/// Sets the response object of @ref async_receive operations.
/// Sets the response object of @ref async_receive2 operations.
template <class Response>
void set_receive_response(Response& resp)
{
@@ -1028,12 +1093,21 @@ public:
/// @copydoc basic_connection::async_receive
template <class CompletionToken = asio::deferred_t>
BOOST_DEPRECATED("Please use async_receive2 instead.")
auto async_receive(CompletionToken&& token = {})
{
return impl_.async_receive(std::forward<CompletionToken>(token));
}
/// @copydoc basic_connection::async_receive2
template <class CompletionToken = asio::deferred_t>
auto async_receive2(CompletionToken&& token = {})
{
return impl_.async_receive2(std::forward<CompletionToken>(token));
}
/// @copydoc basic_connection::receive
BOOST_DEPRECATED("Please use async_receive2 instead.")
std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
/**

View File

@@ -46,13 +46,12 @@ class redis_stream {
void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; }
struct connect_op {
redis_stream& obj_;
redis_stream& obj;
connect_fsm fsm_;
template <class Self>
void execute_action(Self& self, connect_action act)
{
auto& obj = this->obj_; // prevent use-after-move errors
const auto& cfg = fsm_.get_config();
switch (act.type) {
@@ -110,7 +109,7 @@ class redis_stream {
auto act = fsm_.resume(
ec,
selected_endpoint,
obj_.st_,
obj.st_,
self.get_cancellation_state().cancelled());
execute_action(self, act);
}
@@ -122,9 +121,8 @@ class redis_stream {
system::error_code ec,
asio::ip::tcp::resolver::results_type endpoints)
{
auto act = fsm_.resume(ec, endpoints, obj_.st_, self.get_cancellation_state().cancelled());
auto act = fsm_.resume(ec, endpoints, obj.st_, self.get_cancellation_state().cancelled());
if (act.type == connect_action_type::tcp_connect) {
auto& obj = this->obj_; // prevent use-after-free errors
asio::async_connect(
obj.stream_.next_layer(),
std::move(endpoints),
@@ -137,7 +135,7 @@ class redis_stream {
template <class Self>
void operator()(Self& self, system::error_code ec = {})
{
auto act = fsm_.resume(ec, obj_.st_, self.get_cancellation_state().cancelled());
auto act = fsm_.resume(ec, obj.st_, self.get_cancellation_state().cancelled());
execute_action(self, act);
}
};

View File

@@ -74,7 +74,7 @@ enum class error
/// SSL handshake timeout
ssl_handshake_timeout,
/// Can't receive push synchronously without blocking
/// (Deprecated) Can't receive push synchronously without blocking
sync_receive_push_failed,
/// Incompatible node depth.

View File

@@ -31,10 +31,3 @@ request make_hello_request()
}
} // namespace boost::redis::detail
void boost::redis::request::append(const request& other)
{
payload_ += other.payload_;
commands_ += other.commands_;
expected_responses_ += other.expected_responses_;
}

View File

@@ -338,17 +338,6 @@ public:
push_range(cmd, cbegin(range), cend(range));
}
/** @brief Appends the commands in another request to the end of the request.
*
* Appends all the commands contained in `other` to the end of
* this request. Configuration flags in `*this`,
* like @ref config::cancel_if_unresponded, are *not* modified,
* even if `other` has a different config than `*this`.
*
* @param other The request containing the commands to append.
*/
void append(const request& other);
private:
void check_cmd(std::string_view cmd)
{

View File

@@ -70,13 +70,11 @@ using generic_response = adapter::result<std::vector<resp3::node>>;
* @param r The response to modify.
* @param ec Will be populated in case of error.
*/
BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
void consume_one(generic_response& r, system::error_code& ec);
/**
* @brief Throwing overload of `consume_one`.
*
* @param r The response to modify.
*/
/// @copydoc consume_one
BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
void consume_one(generic_response& r);
} // namespace boost::redis

View File

@@ -7,7 +7,7 @@ if (MSVC)
target_compile_options(boost_redis_project_options INTERFACE /bigobj /W4 /WX /wd4459)
target_compile_definitions(boost_redis_project_options INTERFACE _WIN32_WINNT=0x0601 _CRT_SECURE_NO_WARNINGS=1)
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang" OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
target_compile_options(boost_redis_project_options INTERFACE -Wall -Wextra -Werror)
target_compile_options(boost_redis_project_options INTERFACE -Wall -Wextra -Werror -Wno-deprecated-declarations)
endif()
add_library(boost_redis_src STATIC boost_redis.cpp)

View File

@@ -72,26 +72,26 @@ void test_exec()
BOOST_TEST(exec_finished);
}
template <class Connection>
void test_receive()
{
// Setup
asio::io_context ioc;
Connection conn{ioc};
bool receive_finished = false;
generic_response resp;
conn.set_receive_response(resp);
// Call the function with a very short timeout.
conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled);
receive_finished = true;
}));
ioc.run_for(test_timeout);
BOOST_TEST(receive_finished);
}
//template <class Connection>
//void test_receive()
//{
// // Setup
// asio::io_context ioc;
// Connection conn{ioc};
// bool receive_finished = false;
// generic_response resp;
// conn.set_receive_response(resp);
//
// // Call the function with a very short timeout.
// conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
// BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled);
// receive_finished = true;
// }));
//
// ioc.run_for(test_timeout);
//
// BOOST_TEST(receive_finished);
//}
} // namespace
@@ -103,8 +103,8 @@ int main()
test_exec<basic_connection<asio::io_context::executor_type>>();
test_exec<connection>();
test_receive<basic_connection<asio::io_context::executor_type>>();
test_receive<connection>();
//test_receive<basic_connection<asio::io_context::executor_type>>();
//test_receive<connection>();
return boost::report_errors();
}

View File

@@ -27,6 +27,7 @@ using error_code = boost::system::error_code;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::logger;
@@ -54,39 +55,44 @@ std::ostream& operator<<(std::ostream& os, usage const& u)
namespace {
auto push_consumer(connection& conn, int expected) -> net::awaitable<void>
auto
receiver(
connection& conn,
generic_response& resp,
std::size_t expected) -> net::awaitable<void>
{
int c = 0;
for (error_code ec;;) {
conn.receive(ec);
if (ec == error::sync_receive_push_failed) {
ec = {};
co_await conn.async_receive(net::redirect_error(ec));
} else if (!ec) {
//std::cout << "Skipping suspension." << std::endl;
}
if (ec) {
BOOST_TEST(false, "push_consumer error: " << ec.message());
co_return;
}
if (++c == expected)
break;
std::size_t push_counter = 0;
while (push_counter != expected) {
co_await conn.async_receive2();
push_counter += resp.value().get_total_msgs();
resp.value().clear();
}
conn.cancel();
}
auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable<void>
auto echo_session(connection& conn, const request& req, std::size_t n) -> net::awaitable<void>
{
for (auto i = 0; i < n; ++i)
co_await conn.async_exec(pubs);
for (auto i = 0u; i < n; ++i)
co_await conn.async_exec(req);
}
void rethrow_on_error(std::exception_ptr exc)
{
if (exc)
if (exc) {
BOOST_TEST(false);
std::rethrow_exception(exc);
}
}
request make_pub_req(std::size_t n_pubs)
{
request req;
req.push("PING");
for (std::size_t i = 0u; i < n_pubs; ++i)
req.push("PUBLISH", "channel", "payload");
return req;
}
BOOST_AUTO_TEST_CASE(echo_stress)
@@ -98,22 +104,22 @@ BOOST_AUTO_TEST_CASE(echo_stress)
// Number of coroutines that will send pings sharing the same
// connection to redis.
constexpr int sessions = 150;
constexpr std::size_t sessions = 150u;
// The number of pings that will be sent by each session.
constexpr int msgs = 200;
constexpr std::size_t msgs = 200u;
// The number of publishes that will be sent by each session with
// each message.
constexpr int n_pubs = 25;
constexpr std::size_t n_pubs = 25u;
// This is the total number of pushes we will receive.
constexpr int total_pushes = sessions * msgs * n_pubs + 1;
constexpr std::size_t total_pushes = sessions * msgs * n_pubs + 1;
request pubs;
pubs.push("PING");
for (int i = 0; i < n_pubs; ++i)
pubs.push("PUBLISH", "channel", "payload");
generic_response resp;
conn.set_receive_response(resp);
request const pub_req = make_pub_req(n_pubs);
// Run the connection
bool run_finished = false, subscribe_finished = false;
@@ -123,6 +129,10 @@ BOOST_AUTO_TEST_CASE(echo_stress)
std::clog << "async_run finished" << std::endl;
});
// Op that will consume the pushes counting down until all expected
// pushes have been received.
net::co_spawn(ctx, receiver(conn, resp, total_pushes), rethrow_on_error);
// Subscribe, then launch the coroutines
request req;
req.push("SUBSCRIBE", "channel");
@@ -130,12 +140,8 @@ BOOST_AUTO_TEST_CASE(echo_stress)
subscribe_finished = true;
BOOST_TEST(ec == error_code());
// Op that will consume the pushes counting down until all expected
// pushes have been received.
net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error);
for (int i = 0; i < sessions; ++i)
net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error);
for (std::size_t i = 0; i < sessions; ++i)
net::co_spawn(ctx, echo_session(conn, pub_req, msgs), rethrow_on_error);
});
// Run the test
@@ -144,7 +150,13 @@ BOOST_AUTO_TEST_CASE(echo_stress)
BOOST_TEST(subscribe_finished);
// Print statistics
std::cout << "-------------------\n" << conn.get_usage() << std::endl;
std::cout
<< "-------------------\n"
<< "Usage data: \n"
<< conn.get_usage() << "\n"
<< "-------------------\n"
<< "Reallocations: " << resp.value().get_reallocs()
<< std::endl;
}
} // namespace

View File

@@ -269,9 +269,9 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
generic_response gresp;
conn->set_receive_response(gresp);
auto c3 = [&](error_code ec, std::size_t) {
auto c3 = [&](error_code ec) {
c3_called = true;
std::cout << "async_receive" << std::endl;
std::cout << "async_receive2" << std::endl;
BOOST_TEST(!ec);
BOOST_TEST(gresp.has_error());
BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error);
@@ -281,7 +281,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
conn->cancel(operation::reconnection);
};
conn->async_receive(c3);
conn->async_receive2(c3);
run(conn);
@@ -326,4 +326,4 @@ BOOST_AUTO_TEST_CASE(issue_287_generic_response_error_then_success)
BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command");
}
} // namespace
} // namespace

View File

@@ -41,7 +41,7 @@ class test_monitor {
void start_receive()
{
conn.async_receive([this](error_code ec, std::size_t) {
conn.async_receive2([this](error_code ec) {
// We should expect one push entry, at least
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST(monitor_resp.has_value());
@@ -118,4 +118,4 @@ int main()
test_monitor{}.run();
return boost::report_errors();
}
}

View File

@@ -30,6 +30,7 @@ using boost::redis::connection;
using boost::system::error_code;
using boost::redis::request;
using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::system::error_code;
@@ -78,8 +79,8 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
run(conn, make_test_config(), {});
conn->async_receive([&, conn](error_code ec, std::size_t) {
std::cout << "async_receive" << std::endl;
conn->async_receive2([&, conn](error_code ec) {
std::cout << "async_receive2" << std::endl;
BOOST_TEST(ec == error_code());
push_received = true;
conn->cancel();
@@ -98,10 +99,12 @@ BOOST_AUTO_TEST_CASE(push_received1)
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
generic_response resp;
conn->set_receive_response(resp);
// Trick: Uses SUBSCRIBE because this command has no response or
// better said, its response is a server push, which is what we
// want to test. We send two because we want to test both
// async_receive and receive.
// want to test.
request req;
req.push("SUBSCRIBE", "channel1");
req.push("SUBSCRIBE", "channel2");
@@ -114,25 +117,13 @@ BOOST_AUTO_TEST_CASE(push_received1)
BOOST_TEST(ec == error_code());
});
conn->async_receive([&, conn](error_code ec, std::size_t) {
conn->async_receive2([&, conn](error_code ec) {
push_received = true;
std::cout << "(1) async_receive" << std::endl;
std::cout << "async_receive2" << std::endl;
BOOST_TEST(ec == error_code());
// Receives the second push synchronously.
error_code ec2;
std::size_t res = 0;
res = conn->receive(ec2);
BOOST_TEST(!ec2);
BOOST_TEST(res != std::size_t(0));
// Tries to receive a third push synchronously.
ec2 = {};
res = conn->receive(ec2);
BOOST_CHECK_EQUAL(
ec2,
boost::redis::make_error_code(boost::redis::error::sync_receive_push_failed));
BOOST_CHECK_EQUAL(resp.value().get_total_msgs(), 2u);
conn->cancel();
});
@@ -164,7 +155,7 @@ BOOST_AUTO_TEST_CASE(push_filtered_out)
BOOST_TEST(ec == error_code());
});
conn->async_receive([&, conn](error_code ec, std::size_t) {
conn->async_receive2([&, conn](error_code ec) {
push_received = true;
BOOST_TEST(ec == error_code());
conn->cancel(operation::reconnection);
@@ -212,7 +203,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
bool push_received = false, exec_finished = false, run_finished = false;
conn->async_receive([&, conn](error_code ec, std::size_t) {
conn->async_receive2([&, conn](error_code ec) {
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled);
push_received = true;
});
@@ -240,7 +231,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
void launch_push_consumer(std::shared_ptr<connection> conn)
{
conn->async_receive([conn](error_code ec, std::size_t) {
conn->async_receive2([conn](error_code ec) {
if (ec) {
BOOST_TEST(ec == net::experimental::error::channel_cancelled);
return;

View File

@@ -42,16 +42,15 @@ namespace {
// Push consumer
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
std::cout << "uuu" << std::endl;
std::cout << "Entering receiver" << std::endl;
while (conn->will_reconnect()) {
std::cout << "dddd" << std::endl;
// Loop reading Redis pushs messages.
for (;;) {
std::cout << "aaaa" << std::endl;
error_code ec;
co_await conn->async_receive(net::redirect_error(ec));
std::cout << "Reconnect loop" << std::endl;
// Loop reading Redis pushes.
for (error_code ec;;) {
std::cout << "Receive loop" << std::endl;
co_await conn->async_receive2(net::redirect_error(ec));
if (ec) {
std::cout << "Error in async_receive" << std::endl;
std::cout << "Error in async_receive2" << std::endl;
break;
}
}

View File

@@ -4,188 +4,55 @@
* accompanying file LICENSE.txt)
*/
#include <iostream>
#define BOOST_TEST_MODULE request
#include <boost/redis/request.hpp>
#include <boost/core/lightweight_test.hpp>
#include <map>
#include <string>
#include <string_view>
#include <boost/test/included/unit_test.hpp>
using boost::redis::request;
// TODO: Serialization.
namespace {
void test_push_no_args()
BOOST_AUTO_TEST_CASE(single_arg_allocator)
{
request req1;
req1.push("PING");
BOOST_TEST_EQ(req1.payload(), "*1\r\n$4\r\nPING\r\n");
BOOST_CHECK_EQUAL(req1.payload(), std::string{"*1\r\n$4\r\nPING\r\n"});
}
void test_push_int()
BOOST_AUTO_TEST_CASE(arg_int)
{
request req;
req.push("PING", 42);
BOOST_TEST_EQ(req.payload(), "*2\r\n$4\r\nPING\r\n$2\r\n42\r\n");
BOOST_CHECK_EQUAL(req.payload(), std::string{"*2\r\n$4\r\nPING\r\n$2\r\n42\r\n"});
}
void test_push_multiple_args()
BOOST_AUTO_TEST_CASE(multiple_args)
{
char const* res = "*5\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n$2\r\nEX\r\n$1\r\n2\r\n";
request req;
req.push("SET", "key", "value", "EX", "2");
BOOST_TEST_EQ(req.payload(), res);
BOOST_CHECK_EQUAL(req.payload(), std::string{res});
}
void test_push_range()
BOOST_AUTO_TEST_CASE(container_and_range)
{
std::map<std::string, std::string> in{
{"key1", "value1"},
{"key2", "value2"}
};
constexpr std::string_view expected =
char const* res =
"*6\r\n$4\r\nHSET\r\n$3\r\nkey\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$"
"6\r\nvalue2\r\n";
request req1;
req1.push_range("HSET", "key", in);
BOOST_TEST_EQ(req1.payload(), expected);
BOOST_CHECK_EQUAL(req1.payload(), std::string{res});
request req2;
req2.push_range("HSET", "key", std::cbegin(in), std::cend(in));
BOOST_TEST_EQ(req2.payload(), expected);
}
// Append
void test_append()
{
request req1;
req1.push("PING", "req1");
request req2;
req2.push("GET", "mykey");
req2.push("GET", "other");
req1.append(req2);
constexpr std::string_view expected =
"*2\r\n$4\r\nPING\r\n$4\r\nreq1\r\n"
"*2\r\n$3\r\nGET\r\n$5\r\nmykey\r\n"
"*2\r\n$3\r\nGET\r\n$5\r\nother\r\n";
BOOST_TEST_EQ(req1.payload(), expected);
BOOST_TEST_EQ(req1.get_commands(), 3u);
BOOST_TEST_EQ(req1.get_expected_responses(), 3u);
}
// Commands without responses are handled correctly
void test_append_no_response()
{
request req1;
req1.push("PING", "req1");
request req2;
req2.push("SUBSCRIBE", "mychannel");
req2.push("GET", "other");
req1.append(req2);
constexpr std::string_view expected =
"*2\r\n$4\r\nPING\r\n$4\r\nreq1\r\n"
"*2\r\n$9\r\nSUBSCRIBE\r\n$9\r\nmychannel\r\n"
"*2\r\n$3\r\nGET\r\n$5\r\nother\r\n";
BOOST_TEST_EQ(req1.payload(), expected);
BOOST_TEST_EQ(req1.get_commands(), 3u);
BOOST_TEST_EQ(req1.get_expected_responses(), 2u);
}
// Flags are not modified by append
void test_append_flags()
{
request req1;
req1.get_config().cancel_if_not_connected = false;
req1.get_config().cancel_if_unresponded = false;
req1.get_config().cancel_on_connection_lost = false;
req1.push("PING", "req1");
request req2;
req2.get_config().cancel_if_not_connected = true;
req2.get_config().cancel_if_unresponded = true;
req2.get_config().cancel_on_connection_lost = true;
req2.push("GET", "other");
req1.append(req2);
constexpr std::string_view expected =
"*2\r\n$4\r\nPING\r\n$4\r\nreq1\r\n"
"*2\r\n$3\r\nGET\r\n$5\r\nother\r\n";
BOOST_TEST_EQ(req1.payload(), expected);
BOOST_TEST_NOT(req1.get_config().cancel_if_not_connected);
BOOST_TEST_NOT(req1.get_config().cancel_if_unresponded);
BOOST_TEST_NOT(req1.get_config().cancel_on_connection_lost);
}
// Empty requests don't cause problems with append
void test_append_target_empty()
{
request req1;
request req2;
req2.push("GET", "other");
req1.append(req2);
constexpr std::string_view expected = "*2\r\n$3\r\nGET\r\n$5\r\nother\r\n";
BOOST_TEST_EQ(req1.payload(), expected);
BOOST_TEST_EQ(req1.get_commands(), 1u);
BOOST_TEST_EQ(req1.get_expected_responses(), 1u);
}
void test_append_source_empty()
{
request req1;
req1.push("GET", "other");
request req2;
req1.append(req2);
constexpr std::string_view expected = "*2\r\n$3\r\nGET\r\n$5\r\nother\r\n";
BOOST_TEST_EQ(req1.payload(), expected);
BOOST_TEST_EQ(req1.get_commands(), 1u);
BOOST_TEST_EQ(req1.get_expected_responses(), 1u);
}
void test_append_both_empty()
{
request req1;
request req2;
req1.append(req2);
BOOST_TEST_EQ(req1.payload(), "");
BOOST_TEST_EQ(req1.get_commands(), 0u);
BOOST_TEST_EQ(req1.get_expected_responses(), 0u);
}
} // namespace
int main()
{
test_push_no_args();
test_push_int();
test_push_multiple_args();
test_push_range();
test_append();
test_append_no_response();
test_append_flags();
test_append_target_empty();
test_append_source_empty();
test_append_both_empty();
return boost::report_errors();
BOOST_CHECK_EQUAL(req2.payload(), std::string{res});
}