2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 16:52:08 +00:00

Compare commits

...

1 Commits

Author SHA1 Message Date
Marcelo Zimbres
a498f041af Adds async_receive2 function to replace async_receive. 2025-10-25 14:26:45 +02:00
14 changed files with 203 additions and 139 deletions

View File

@@ -106,19 +106,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis pushes.
for (;;) {
error_code ec;
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// Use the response resp in some way and then clear it.
...
consume_one(resp);
resp.value().clear();
}
}
}
@@ -126,4 +125,4 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
## Further reading
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).

View File

@@ -117,19 +117,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis pushes.
for (;;) {
error_code ec;
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
// Use the response resp in some way and then clear it.
// Use the response here and then clear it.
...
consume_one(resp);
resp.value().clear();
}
}
}

View File

@@ -30,11 +30,9 @@ using boost::asio::consign;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::redirect_error;
using boost::asio::use_awaitable;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::request;
using boost::system::error_code;
using namespace std::chrono_literals;
@@ -52,11 +50,11 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis push messages.
for (error_code ec;;) {
co_await conn->async_receive(redirect_error(use_awaitable, ec));
co_await conn->async_receive2(redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
@@ -74,7 +72,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
msg.erase(0, n);
}
}

View File

@@ -1,11 +1,10 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
@@ -23,11 +22,7 @@ namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::consume_one;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::ignore;
using boost::redis::error;
using boost::system::error_code;
using boost::redis::connection;
using asio::signal_set;
@@ -60,24 +55,23 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
// Loop while reconnection is enabled
while (conn->will_reconnect()) {
// Reconnect to the channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
// Loop reading Redis pushs messages.
// Loop to read Redis push messages.
for (error_code ec;;) {
// First tries to read any buffered pushes.
conn->receive(ec);
if (ec == error::sync_receive_push_failed) {
ec = {};
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
}
// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
<< resp.value().at(3).value << std::endl;
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem: resp.value().get_view())
std::cout << elem.value.data << "\n";
consume_one(resp);
std::cout << std::endl;
resp.value().clear();
}
}
}

View File

@@ -593,7 +593,7 @@ public:
return async_run(config{}, std::forward<CompletionToken>(token));
}
/** @brief Receives server side pushes asynchronously.
/** @brief (Deprecated) Receives server side pushes asynchronously.
*
* When pushes arrive and there is no `async_receive` operation in
* progress, pushed data, requests, and responses will be paused
@@ -623,12 +623,76 @@ public:
* @param token Completion token.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
BOOST_DEPRECATED("Please use async_receive2 instead.")
auto async_receive(CompletionToken&& token = {})
{
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
}
/** @brief Receives server pushes synchronously without blocking.
/** @brief Wait for server pushes asynchronously
*
* This function suspends until a server push is received by the
* connection. On completion an unspecified number of pushes will
* have been added to the response object set with @ref
* boost::redis::connection::set_receive_response.
*
* To prevent receiving an unbound number of pushes the connection
* blocks further read operations on the socket when 256 pushes
* accumulate internally (we don't make any commitment to this
* exact number). When that happens ongoing `async_exec`s and
* health-checks won't make any progress and the connection will
* eventually timeout. To avoid that Apps should call
* `async_receive2` continuously in a loop.
*
* @Note To avoid deadlocks the task (e.g. coroutine) calling
* `async_receive2` should not call `async_exec` in a way where
* they could block each other.
*
* For an example see cpp20_subscriber.cpp. The completion token
* must have the following signature
*
* @code
* void f(system::error_code);
* @endcode
*
* @par Per-operation cancellation
* This operation supports the following cancellation types:
*
* @li `asio::cancellation_type_t::terminal`.
* @li `asio::cancellation_type_t::partial`.
* @li `asio::cancellation_type_t::total`.
*
* Calling `basic_connection::cancel(operation::receive)` will
* also cancel any ongoing receive operations.
*
* @param token Completion token.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_receive2(CompletionToken&& token = {})
{
return
impl_->receive_channel_.async_receive(
asio::deferred(
[&conn = *this](system::error_code ec, std::size_t)
{
if (!ec) {
auto f = [](system::error_code, std::size_t) {
// There is no point in checking for errors
// here since async_receive just completed
// without errors.
};
// We just want to drain the channel.
while (conn.impl_->receive_channel_.try_receive(f));
}
return asio::deferred.values(ec);
}
)
)(std::forward<CompletionToken>(token));
}
/** @brief (Deprecated) Receives server pushes synchronously without blocking.
*
* Receives a server push synchronously by calling `try_receive` on
* the underlying channel. If the operation fails because
@@ -638,6 +702,7 @@ public:
* @param ec Contains the error if any occurred.
* @returns The number of bytes read from the socket.
*/
BOOST_DEPRECATED("Please, use async_receive2 instead.")
std::size_t receive(system::error_code& ec)
{
std::size_t size = 0;
@@ -837,7 +902,7 @@ public:
"the other member functions to interact with the connection.")
auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
/// Sets the response object of @ref async_receive operations.
/// Sets the response object of @ref async_receive2 operations.
template <class Response>
void set_receive_response(Response& resp)
{
@@ -1028,12 +1093,21 @@ public:
/// @copydoc basic_connection::async_receive
template <class CompletionToken = asio::deferred_t>
BOOST_DEPRECATED("Please use async_receive2 instead.")
auto async_receive(CompletionToken&& token = {})
{
return impl_.async_receive(std::forward<CompletionToken>(token));
}
/// @copydoc basic_connection::async_receive2
template <class CompletionToken = asio::deferred_t>
auto async_receive2(CompletionToken&& token = {})
{
return impl_.async_receive2(std::forward<CompletionToken>(token));
}
/// @copydoc basic_connection::receive
BOOST_DEPRECATED("Please use async_receive2 instead.")
std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
/**

View File

@@ -74,7 +74,7 @@ enum class error
/// SSL handshake timeout
ssl_handshake_timeout,
/// Can't receive push synchronously without blocking
/// (Deprecated) Can't receive push synchronously without blocking
sync_receive_push_failed,
/// Incompatible node depth.

View File

@@ -70,13 +70,11 @@ using generic_response = adapter::result<std::vector<resp3::node>>;
* @param r The response to modify.
* @param ec Will be populated in case of error.
*/
BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
void consume_one(generic_response& r, system::error_code& ec);
/**
* @brief Throwing overload of `consume_one`.
*
* @param r The response to modify.
*/
/// @copydoc consume_one
BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
void consume_one(generic_response& r);
} // namespace boost::redis

View File

@@ -7,7 +7,7 @@ if (MSVC)
target_compile_options(boost_redis_project_options INTERFACE /bigobj /W4 /WX /wd4459)
target_compile_definitions(boost_redis_project_options INTERFACE _WIN32_WINNT=0x0601 _CRT_SECURE_NO_WARNINGS=1)
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang" OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
target_compile_options(boost_redis_project_options INTERFACE -Wall -Wextra -Werror)
target_compile_options(boost_redis_project_options INTERFACE -Wall -Wextra -Werror -Wno-deprecated-declarations)
endif()
add_library(boost_redis_src STATIC boost_redis.cpp)

View File

@@ -72,26 +72,26 @@ void test_exec()
BOOST_TEST(exec_finished);
}
template <class Connection>
void test_receive()
{
// Setup
asio::io_context ioc;
Connection conn{ioc};
bool receive_finished = false;
generic_response resp;
conn.set_receive_response(resp);
// Call the function with a very short timeout.
conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled);
receive_finished = true;
}));
ioc.run_for(test_timeout);
BOOST_TEST(receive_finished);
}
//template <class Connection>
//void test_receive()
//{
// // Setup
// asio::io_context ioc;
// Connection conn{ioc};
// bool receive_finished = false;
// generic_response resp;
// conn.set_receive_response(resp);
//
// // Call the function with a very short timeout.
// conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
// BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled);
// receive_finished = true;
// }));
//
// ioc.run_for(test_timeout);
//
// BOOST_TEST(receive_finished);
//}
} // namespace
@@ -103,8 +103,8 @@ int main()
test_exec<basic_connection<asio::io_context::executor_type>>();
test_exec<connection>();
test_receive<basic_connection<asio::io_context::executor_type>>();
test_receive<connection>();
//test_receive<basic_connection<asio::io_context::executor_type>>();
//test_receive<connection>();
return boost::report_errors();
}

View File

@@ -27,6 +27,7 @@ using error_code = boost::system::error_code;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::logger;
@@ -54,39 +55,44 @@ std::ostream& operator<<(std::ostream& os, usage const& u)
namespace {
auto push_consumer(connection& conn, int expected) -> net::awaitable<void>
auto
receiver(
connection& conn,
generic_response& resp,
std::size_t expected) -> net::awaitable<void>
{
int c = 0;
for (error_code ec;;) {
conn.receive(ec);
if (ec == error::sync_receive_push_failed) {
ec = {};
co_await conn.async_receive(net::redirect_error(ec));
} else if (!ec) {
//std::cout << "Skipping suspension." << std::endl;
}
if (ec) {
BOOST_TEST(false, "push_consumer error: " << ec.message());
co_return;
}
if (++c == expected)
break;
std::size_t push_counter = 0;
while (push_counter != expected) {
co_await conn.async_receive2();
push_counter += resp.value().get_total_msgs();
resp.value().clear();
}
conn.cancel();
}
auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable<void>
auto echo_session(connection& conn, const request& req, std::size_t n) -> net::awaitable<void>
{
for (auto i = 0; i < n; ++i)
co_await conn.async_exec(pubs);
for (auto i = 0u; i < n; ++i)
co_await conn.async_exec(req);
}
void rethrow_on_error(std::exception_ptr exc)
{
if (exc)
if (exc) {
BOOST_TEST(false);
std::rethrow_exception(exc);
}
}
request make_pub_req(std::size_t n_pubs)
{
request req;
req.push("PING");
for (std::size_t i = 0u; i < n_pubs; ++i)
req.push("PUBLISH", "channel", "payload");
return req;
}
BOOST_AUTO_TEST_CASE(echo_stress)
@@ -98,22 +104,22 @@ BOOST_AUTO_TEST_CASE(echo_stress)
// Number of coroutines that will send pings sharing the same
// connection to redis.
constexpr int sessions = 150;
constexpr std::size_t sessions = 150u;
// The number of pings that will be sent by each session.
constexpr int msgs = 200;
constexpr std::size_t msgs = 200u;
// The number of publishes that will be sent by each session with
// each message.
constexpr int n_pubs = 25;
constexpr std::size_t n_pubs = 25u;
// This is the total number of pushes we will receive.
constexpr int total_pushes = sessions * msgs * n_pubs + 1;
constexpr std::size_t total_pushes = sessions * msgs * n_pubs + 1;
request pubs;
pubs.push("PING");
for (int i = 0; i < n_pubs; ++i)
pubs.push("PUBLISH", "channel", "payload");
generic_response resp;
conn.set_receive_response(resp);
request const pub_req = make_pub_req(n_pubs);
// Run the connection
bool run_finished = false, subscribe_finished = false;
@@ -123,6 +129,10 @@ BOOST_AUTO_TEST_CASE(echo_stress)
std::clog << "async_run finished" << std::endl;
});
// Op that will consume the pushes counting down until all expected
// pushes have been received.
net::co_spawn(ctx, receiver(conn, resp, total_pushes), rethrow_on_error);
// Subscribe, then launch the coroutines
request req;
req.push("SUBSCRIBE", "channel");
@@ -130,12 +140,8 @@ BOOST_AUTO_TEST_CASE(echo_stress)
subscribe_finished = true;
BOOST_TEST(ec == error_code());
// Op that will consume the pushes counting down until all expected
// pushes have been received.
net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error);
for (int i = 0; i < sessions; ++i)
net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error);
for (std::size_t i = 0; i < sessions; ++i)
net::co_spawn(ctx, echo_session(conn, pub_req, msgs), rethrow_on_error);
});
// Run the test
@@ -144,7 +150,13 @@ BOOST_AUTO_TEST_CASE(echo_stress)
BOOST_TEST(subscribe_finished);
// Print statistics
std::cout << "-------------------\n" << conn.get_usage() << std::endl;
std::cout
<< "-------------------\n"
<< "Usage data: \n"
<< conn.get_usage() << "\n"
<< "-------------------\n"
<< "Reallocations: " << resp.value().get_reallocs()
<< std::endl;
}
} // namespace

View File

@@ -269,9 +269,9 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
generic_response gresp;
conn->set_receive_response(gresp);
auto c3 = [&](error_code ec, std::size_t) {
auto c3 = [&](error_code ec) {
c3_called = true;
std::cout << "async_receive" << std::endl;
std::cout << "async_receive2" << std::endl;
BOOST_TEST(!ec);
BOOST_TEST(gresp.has_error());
BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error);
@@ -281,7 +281,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
conn->cancel(operation::reconnection);
};
conn->async_receive(c3);
conn->async_receive2(c3);
run(conn);
@@ -326,4 +326,4 @@ BOOST_AUTO_TEST_CASE(issue_287_generic_response_error_then_success)
BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command");
}
} // namespace
} // namespace

View File

@@ -41,7 +41,7 @@ class test_monitor {
void start_receive()
{
conn.async_receive([this](error_code ec, std::size_t) {
conn.async_receive2([this](error_code ec) {
// We should expect one push entry, at least
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST(monitor_resp.has_value());
@@ -118,4 +118,4 @@ int main()
test_monitor{}.run();
return boost::report_errors();
}
}

View File

@@ -30,6 +30,7 @@ using boost::redis::connection;
using boost::system::error_code;
using boost::redis::request;
using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::system::error_code;
@@ -78,8 +79,8 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
run(conn, make_test_config(), {});
conn->async_receive([&, conn](error_code ec, std::size_t) {
std::cout << "async_receive" << std::endl;
conn->async_receive2([&, conn](error_code ec) {
std::cout << "async_receive2" << std::endl;
BOOST_TEST(ec == error_code());
push_received = true;
conn->cancel();
@@ -98,10 +99,12 @@ BOOST_AUTO_TEST_CASE(push_received1)
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
generic_response resp;
conn->set_receive_response(resp);
// Trick: Uses SUBSCRIBE because this command has no response or
// better said, its response is a server push, which is what we
// want to test. We send two because we want to test both
// async_receive and receive.
// want to test.
request req;
req.push("SUBSCRIBE", "channel1");
req.push("SUBSCRIBE", "channel2");
@@ -114,25 +117,13 @@ BOOST_AUTO_TEST_CASE(push_received1)
BOOST_TEST(ec == error_code());
});
conn->async_receive([&, conn](error_code ec, std::size_t) {
conn->async_receive2([&, conn](error_code ec) {
push_received = true;
std::cout << "(1) async_receive" << std::endl;
std::cout << "async_receive2" << std::endl;
BOOST_TEST(ec == error_code());
// Receives the second push synchronously.
error_code ec2;
std::size_t res = 0;
res = conn->receive(ec2);
BOOST_TEST(!ec2);
BOOST_TEST(res != std::size_t(0));
// Tries to receive a third push synchronously.
ec2 = {};
res = conn->receive(ec2);
BOOST_CHECK_EQUAL(
ec2,
boost::redis::make_error_code(boost::redis::error::sync_receive_push_failed));
BOOST_CHECK_EQUAL(resp.value().get_total_msgs(), 2u);
conn->cancel();
});
@@ -164,7 +155,7 @@ BOOST_AUTO_TEST_CASE(push_filtered_out)
BOOST_TEST(ec == error_code());
});
conn->async_receive([&, conn](error_code ec, std::size_t) {
conn->async_receive2([&, conn](error_code ec) {
push_received = true;
BOOST_TEST(ec == error_code());
conn->cancel(operation::reconnection);
@@ -212,7 +203,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
bool push_received = false, exec_finished = false, run_finished = false;
conn->async_receive([&, conn](error_code ec, std::size_t) {
conn->async_receive2([&, conn](error_code ec) {
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled);
push_received = true;
});
@@ -240,7 +231,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
void launch_push_consumer(std::shared_ptr<connection> conn)
{
conn->async_receive([conn](error_code ec, std::size_t) {
conn->async_receive2([conn](error_code ec) {
if (ec) {
BOOST_TEST(ec == net::experimental::error::channel_cancelled);
return;

View File

@@ -42,16 +42,15 @@ namespace {
// Push consumer
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
std::cout << "uuu" << std::endl;
std::cout << "Entering receiver" << std::endl;
while (conn->will_reconnect()) {
std::cout << "dddd" << std::endl;
// Loop reading Redis pushs messages.
for (;;) {
std::cout << "aaaa" << std::endl;
error_code ec;
co_await conn->async_receive(net::redirect_error(ec));
std::cout << "Reconnect loop" << std::endl;
// Loop reading Redis pushes.
for (error_code ec;;) {
std::cout << "Receive loop" << std::endl;
co_await conn->async_receive2(net::redirect_error(ec));
if (ec) {
std::cout << "Error in async_receive" << std::endl;
std::cout << "Error in async_receive2" << std::endl;
break;
}
}