mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 16:52:08 +00:00
Compare commits
1 Commits
develop
...
better_sub
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a498f041af |
11
README.md
11
README.md
@@ -106,19 +106,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
while (conn->will_reconnect()) {
|
||||
|
||||
// Reconnect to channels.
|
||||
co_await conn->async_exec(req, ignore);
|
||||
co_await conn->async_exec(req);
|
||||
|
||||
// Loop reading Redis pushes.
|
||||
for (;;) {
|
||||
error_code ec;
|
||||
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
|
||||
for (error_code ec;;) {
|
||||
co_await conn->async_receive2(resp, redirect_error(ec));
|
||||
if (ec)
|
||||
break; // Connection lost, break so we can reconnect to channels.
|
||||
|
||||
// Use the response resp in some way and then clear it.
|
||||
...
|
||||
|
||||
consume_one(resp);
|
||||
resp.value().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -126,4 +125,4 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
|
||||
## Further reading
|
||||
|
||||
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
|
||||
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
|
||||
|
||||
@@ -117,19 +117,18 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
while (conn->will_reconnect()) {
|
||||
|
||||
// Reconnect to channels.
|
||||
co_await conn->async_exec(req, ignore);
|
||||
co_await conn->async_exec(req);
|
||||
|
||||
// Loop reading Redis pushes.
|
||||
for (;;) {
|
||||
error_code ec;
|
||||
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
|
||||
for (error_code ec;;) {
|
||||
co_await conn->async_receive2(resp, redirect_error(ec));
|
||||
if (ec)
|
||||
break; // Connection lost, break so we can reconnect to channels.
|
||||
|
||||
// Use the response resp in some way and then clear it.
|
||||
// Use the response here and then clear it.
|
||||
...
|
||||
|
||||
consume_one(resp);
|
||||
resp.value().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,11 +30,9 @@ using boost::asio::consign;
|
||||
using boost::asio::detached;
|
||||
using boost::asio::dynamic_buffer;
|
||||
using boost::asio::redirect_error;
|
||||
using boost::asio::use_awaitable;
|
||||
using boost::redis::config;
|
||||
using boost::redis::connection;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::request;
|
||||
using boost::system::error_code;
|
||||
using namespace std::chrono_literals;
|
||||
@@ -52,11 +50,11 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
|
||||
|
||||
while (conn->will_reconnect()) {
|
||||
// Subscribe to channels.
|
||||
co_await conn->async_exec(req, ignore);
|
||||
co_await conn->async_exec(req);
|
||||
|
||||
// Loop reading Redis push messages.
|
||||
for (error_code ec;;) {
|
||||
co_await conn->async_receive(redirect_error(use_awaitable, ec));
|
||||
co_await conn->async_receive2(redirect_error(ec));
|
||||
if (ec)
|
||||
break; // Connection lost, break so we can reconnect to channels.
|
||||
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
|
||||
@@ -74,7 +72,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
|
||||
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
|
||||
request req;
|
||||
req.push("PUBLISH", "channel", msg);
|
||||
co_await conn->async_exec(req, ignore);
|
||||
co_await conn->async_exec(req);
|
||||
msg.erase(0, n);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/redis/connection.hpp>
|
||||
#include <boost/redis/logger.hpp>
|
||||
|
||||
#include <boost/asio/awaitable.hpp>
|
||||
#include <boost/asio/co_spawn.hpp>
|
||||
@@ -23,11 +22,7 @@ namespace asio = boost::asio;
|
||||
using namespace std::chrono_literals;
|
||||
using boost::redis::request;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::consume_one;
|
||||
using boost::redis::logger;
|
||||
using boost::redis::config;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::error;
|
||||
using boost::system::error_code;
|
||||
using boost::redis::connection;
|
||||
using asio::signal_set;
|
||||
@@ -60,24 +55,23 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
|
||||
// Loop while reconnection is enabled
|
||||
while (conn->will_reconnect()) {
|
||||
// Reconnect to the channels.
|
||||
co_await conn->async_exec(req, ignore);
|
||||
co_await conn->async_exec(req);
|
||||
|
||||
// Loop reading Redis pushs messages.
|
||||
// Loop to read Redis push messages.
|
||||
for (error_code ec;;) {
|
||||
// First tries to read any buffered pushes.
|
||||
conn->receive(ec);
|
||||
if (ec == error::sync_receive_push_failed) {
|
||||
ec = {};
|
||||
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
|
||||
}
|
||||
|
||||
// Wait for pushes
|
||||
co_await conn->async_receive2(asio::redirect_error(ec));
|
||||
if (ec)
|
||||
break; // Connection lost, break so we can reconnect to channels.
|
||||
|
||||
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
|
||||
<< resp.value().at(3).value << std::endl;
|
||||
// The response must be consumed without suspending the
|
||||
// coroutine i.e. without the use of async operations.
|
||||
for (auto const& elem: resp.value().get_view())
|
||||
std::cout << elem.value.data << "\n";
|
||||
|
||||
consume_one(resp);
|
||||
std::cout << std::endl;
|
||||
|
||||
resp.value().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -593,7 +593,7 @@ public:
|
||||
return async_run(config{}, std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/** @brief Receives server side pushes asynchronously.
|
||||
/** @brief (Deprecated) Receives server side pushes asynchronously.
|
||||
*
|
||||
* When pushes arrive and there is no `async_receive` operation in
|
||||
* progress, pushed data, requests, and responses will be paused
|
||||
@@ -623,12 +623,76 @@ public:
|
||||
* @param token Completion token.
|
||||
*/
|
||||
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
|
||||
BOOST_DEPRECATED("Please use async_receive2 instead.")
|
||||
auto async_receive(CompletionToken&& token = {})
|
||||
{
|
||||
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/** @brief Receives server pushes synchronously without blocking.
|
||||
/** @brief Wait for server pushes asynchronously
|
||||
*
|
||||
* This function suspends until a server push is received by the
|
||||
* connection. On completion an unspecified number of pushes will
|
||||
* have been added to the response object set with @ref
|
||||
* boost::redis::connection::set_receive_response.
|
||||
*
|
||||
* To prevent receiving an unbound number of pushes the connection
|
||||
* blocks further read operations on the socket when 256 pushes
|
||||
* accumulate internally (we don't make any commitment to this
|
||||
* exact number). When that happens ongoing `async_exec`s and
|
||||
* health-checks won't make any progress and the connection will
|
||||
* eventually timeout. To avoid that Apps should call
|
||||
* `async_receive2` continuously in a loop.
|
||||
*
|
||||
* @Note To avoid deadlocks the task (e.g. coroutine) calling
|
||||
* `async_receive2` should not call `async_exec` in a way where
|
||||
* they could block each other.
|
||||
*
|
||||
* For an example see cpp20_subscriber.cpp. The completion token
|
||||
* must have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(system::error_code);
|
||||
* @endcode
|
||||
*
|
||||
* @par Per-operation cancellation
|
||||
* This operation supports the following cancellation types:
|
||||
*
|
||||
* @li `asio::cancellation_type_t::terminal`.
|
||||
* @li `asio::cancellation_type_t::partial`.
|
||||
* @li `asio::cancellation_type_t::total`.
|
||||
*
|
||||
* Calling `basic_connection::cancel(operation::receive)` will
|
||||
* also cancel any ongoing receive operations.
|
||||
*
|
||||
* @param token Completion token.
|
||||
*/
|
||||
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
|
||||
auto async_receive2(CompletionToken&& token = {})
|
||||
{
|
||||
return
|
||||
impl_->receive_channel_.async_receive(
|
||||
asio::deferred(
|
||||
[&conn = *this](system::error_code ec, std::size_t)
|
||||
{
|
||||
if (!ec) {
|
||||
auto f = [](system::error_code, std::size_t) {
|
||||
// There is no point in checking for errors
|
||||
// here since async_receive just completed
|
||||
// without errors.
|
||||
};
|
||||
|
||||
// We just want to drain the channel.
|
||||
while (conn.impl_->receive_channel_.try_receive(f));
|
||||
}
|
||||
|
||||
return asio::deferred.values(ec);
|
||||
}
|
||||
)
|
||||
)(std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/** @brief (Deprecated) Receives server pushes synchronously without blocking.
|
||||
*
|
||||
* Receives a server push synchronously by calling `try_receive` on
|
||||
* the underlying channel. If the operation fails because
|
||||
@@ -638,6 +702,7 @@ public:
|
||||
* @param ec Contains the error if any occurred.
|
||||
* @returns The number of bytes read from the socket.
|
||||
*/
|
||||
BOOST_DEPRECATED("Please, use async_receive2 instead.")
|
||||
std::size_t receive(system::error_code& ec)
|
||||
{
|
||||
std::size_t size = 0;
|
||||
@@ -837,7 +902,7 @@ public:
|
||||
"the other member functions to interact with the connection.")
|
||||
auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
|
||||
|
||||
/// Sets the response object of @ref async_receive operations.
|
||||
/// Sets the response object of @ref async_receive2 operations.
|
||||
template <class Response>
|
||||
void set_receive_response(Response& resp)
|
||||
{
|
||||
@@ -1028,12 +1093,21 @@ public:
|
||||
|
||||
/// @copydoc basic_connection::async_receive
|
||||
template <class CompletionToken = asio::deferred_t>
|
||||
BOOST_DEPRECATED("Please use async_receive2 instead.")
|
||||
auto async_receive(CompletionToken&& token = {})
|
||||
{
|
||||
return impl_.async_receive(std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/// @copydoc basic_connection::async_receive2
|
||||
template <class CompletionToken = asio::deferred_t>
|
||||
auto async_receive2(CompletionToken&& token = {})
|
||||
{
|
||||
return impl_.async_receive2(std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/// @copydoc basic_connection::receive
|
||||
BOOST_DEPRECATED("Please use async_receive2 instead.")
|
||||
std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
|
||||
|
||||
/**
|
||||
|
||||
@@ -74,7 +74,7 @@ enum class error
|
||||
/// SSL handshake timeout
|
||||
ssl_handshake_timeout,
|
||||
|
||||
/// Can't receive push synchronously without blocking
|
||||
/// (Deprecated) Can't receive push synchronously without blocking
|
||||
sync_receive_push_failed,
|
||||
|
||||
/// Incompatible node depth.
|
||||
|
||||
@@ -70,13 +70,11 @@ using generic_response = adapter::result<std::vector<resp3::node>>;
|
||||
* @param r The response to modify.
|
||||
* @param ec Will be populated in case of error.
|
||||
*/
|
||||
BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
|
||||
void consume_one(generic_response& r, system::error_code& ec);
|
||||
|
||||
/**
|
||||
* @brief Throwing overload of `consume_one`.
|
||||
*
|
||||
* @param r The response to modify.
|
||||
*/
|
||||
/// @copydoc consume_one
|
||||
BOOST_DEPRECATED("This function is not needed anymore to consume server pushes.")
|
||||
void consume_one(generic_response& r);
|
||||
|
||||
} // namespace boost::redis
|
||||
|
||||
@@ -7,7 +7,7 @@ if (MSVC)
|
||||
target_compile_options(boost_redis_project_options INTERFACE /bigobj /W4 /WX /wd4459)
|
||||
target_compile_definitions(boost_redis_project_options INTERFACE _WIN32_WINNT=0x0601 _CRT_SECURE_NO_WARNINGS=1)
|
||||
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang" OR CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||
target_compile_options(boost_redis_project_options INTERFACE -Wall -Wextra -Werror)
|
||||
target_compile_options(boost_redis_project_options INTERFACE -Wall -Wextra -Werror -Wno-deprecated-declarations)
|
||||
endif()
|
||||
|
||||
add_library(boost_redis_src STATIC boost_redis.cpp)
|
||||
|
||||
@@ -72,26 +72,26 @@ void test_exec()
|
||||
BOOST_TEST(exec_finished);
|
||||
}
|
||||
|
||||
template <class Connection>
|
||||
void test_receive()
|
||||
{
|
||||
// Setup
|
||||
asio::io_context ioc;
|
||||
Connection conn{ioc};
|
||||
bool receive_finished = false;
|
||||
generic_response resp;
|
||||
conn.set_receive_response(resp);
|
||||
|
||||
// Call the function with a very short timeout.
|
||||
conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled);
|
||||
receive_finished = true;
|
||||
}));
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
BOOST_TEST(receive_finished);
|
||||
}
|
||||
//template <class Connection>
|
||||
//void test_receive()
|
||||
//{
|
||||
// // Setup
|
||||
// asio::io_context ioc;
|
||||
// Connection conn{ioc};
|
||||
// bool receive_finished = false;
|
||||
// generic_response resp;
|
||||
// conn.set_receive_response(resp);
|
||||
//
|
||||
// // Call the function with a very short timeout.
|
||||
// conn.async_receive(asio::cancel_after(1ms, [&](error_code ec, std::size_t) {
|
||||
// BOOST_TEST_EQ(ec, asio::experimental::channel_errc::channel_cancelled);
|
||||
// receive_finished = true;
|
||||
// }));
|
||||
//
|
||||
// ioc.run_for(test_timeout);
|
||||
//
|
||||
// BOOST_TEST(receive_finished);
|
||||
//}
|
||||
|
||||
} // namespace
|
||||
|
||||
@@ -103,8 +103,8 @@ int main()
|
||||
test_exec<basic_connection<asio::io_context::executor_type>>();
|
||||
test_exec<connection>();
|
||||
|
||||
test_receive<basic_connection<asio::io_context::executor_type>>();
|
||||
test_receive<connection>();
|
||||
//test_receive<basic_connection<asio::io_context::executor_type>>();
|
||||
//test_receive<connection>();
|
||||
|
||||
return boost::report_errors();
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ using error_code = boost::system::error_code;
|
||||
using boost::redis::operation;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::logger;
|
||||
@@ -54,39 +55,44 @@ std::ostream& operator<<(std::ostream& os, usage const& u)
|
||||
|
||||
namespace {
|
||||
|
||||
auto push_consumer(connection& conn, int expected) -> net::awaitable<void>
|
||||
auto
|
||||
receiver(
|
||||
connection& conn,
|
||||
generic_response& resp,
|
||||
std::size_t expected) -> net::awaitable<void>
|
||||
{
|
||||
int c = 0;
|
||||
for (error_code ec;;) {
|
||||
conn.receive(ec);
|
||||
if (ec == error::sync_receive_push_failed) {
|
||||
ec = {};
|
||||
co_await conn.async_receive(net::redirect_error(ec));
|
||||
} else if (!ec) {
|
||||
//std::cout << "Skipping suspension." << std::endl;
|
||||
}
|
||||
|
||||
if (ec) {
|
||||
BOOST_TEST(false, "push_consumer error: " << ec.message());
|
||||
co_return;
|
||||
}
|
||||
if (++c == expected)
|
||||
break;
|
||||
std::size_t push_counter = 0;
|
||||
while (push_counter != expected) {
|
||||
co_await conn.async_receive2();
|
||||
push_counter += resp.value().get_total_msgs();
|
||||
resp.value().clear();
|
||||
}
|
||||
|
||||
conn.cancel();
|
||||
}
|
||||
|
||||
auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable<void>
|
||||
auto echo_session(connection& conn, const request& req, std::size_t n) -> net::awaitable<void>
|
||||
{
|
||||
for (auto i = 0; i < n; ++i)
|
||||
co_await conn.async_exec(pubs);
|
||||
for (auto i = 0u; i < n; ++i)
|
||||
co_await conn.async_exec(req);
|
||||
}
|
||||
|
||||
void rethrow_on_error(std::exception_ptr exc)
|
||||
{
|
||||
if (exc)
|
||||
if (exc) {
|
||||
BOOST_TEST(false);
|
||||
std::rethrow_exception(exc);
|
||||
}
|
||||
}
|
||||
|
||||
request make_pub_req(std::size_t n_pubs)
|
||||
{
|
||||
request req;
|
||||
req.push("PING");
|
||||
for (std::size_t i = 0u; i < n_pubs; ++i)
|
||||
req.push("PUBLISH", "channel", "payload");
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
@@ -98,22 +104,22 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
|
||||
// Number of coroutines that will send pings sharing the same
|
||||
// connection to redis.
|
||||
constexpr int sessions = 150;
|
||||
constexpr std::size_t sessions = 150u;
|
||||
|
||||
// The number of pings that will be sent by each session.
|
||||
constexpr int msgs = 200;
|
||||
constexpr std::size_t msgs = 200u;
|
||||
|
||||
// The number of publishes that will be sent by each session with
|
||||
// each message.
|
||||
constexpr int n_pubs = 25;
|
||||
constexpr std::size_t n_pubs = 25u;
|
||||
|
||||
// This is the total number of pushes we will receive.
|
||||
constexpr int total_pushes = sessions * msgs * n_pubs + 1;
|
||||
constexpr std::size_t total_pushes = sessions * msgs * n_pubs + 1;
|
||||
|
||||
request pubs;
|
||||
pubs.push("PING");
|
||||
for (int i = 0; i < n_pubs; ++i)
|
||||
pubs.push("PUBLISH", "channel", "payload");
|
||||
generic_response resp;
|
||||
conn.set_receive_response(resp);
|
||||
|
||||
request const pub_req = make_pub_req(n_pubs);
|
||||
|
||||
// Run the connection
|
||||
bool run_finished = false, subscribe_finished = false;
|
||||
@@ -123,6 +129,10 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
std::clog << "async_run finished" << std::endl;
|
||||
});
|
||||
|
||||
// Op that will consume the pushes counting down until all expected
|
||||
// pushes have been received.
|
||||
net::co_spawn(ctx, receiver(conn, resp, total_pushes), rethrow_on_error);
|
||||
|
||||
// Subscribe, then launch the coroutines
|
||||
request req;
|
||||
req.push("SUBSCRIBE", "channel");
|
||||
@@ -130,12 +140,8 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
subscribe_finished = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
|
||||
// Op that will consume the pushes counting down until all expected
|
||||
// pushes have been received.
|
||||
net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error);
|
||||
|
||||
for (int i = 0; i < sessions; ++i)
|
||||
net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error);
|
||||
for (std::size_t i = 0; i < sessions; ++i)
|
||||
net::co_spawn(ctx, echo_session(conn, pub_req, msgs), rethrow_on_error);
|
||||
});
|
||||
|
||||
// Run the test
|
||||
@@ -144,7 +150,13 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
BOOST_TEST(subscribe_finished);
|
||||
|
||||
// Print statistics
|
||||
std::cout << "-------------------\n" << conn.get_usage() << std::endl;
|
||||
std::cout
|
||||
<< "-------------------\n"
|
||||
<< "Usage data: \n"
|
||||
<< conn.get_usage() << "\n"
|
||||
<< "-------------------\n"
|
||||
<< "Reallocations: " << resp.value().get_reallocs()
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
@@ -269,9 +269,9 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
|
||||
generic_response gresp;
|
||||
conn->set_receive_response(gresp);
|
||||
|
||||
auto c3 = [&](error_code ec, std::size_t) {
|
||||
auto c3 = [&](error_code ec) {
|
||||
c3_called = true;
|
||||
std::cout << "async_receive" << std::endl;
|
||||
std::cout << "async_receive2" << std::endl;
|
||||
BOOST_TEST(!ec);
|
||||
BOOST_TEST(gresp.has_error());
|
||||
BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error);
|
||||
@@ -281,7 +281,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
|
||||
conn->cancel(operation::reconnection);
|
||||
};
|
||||
|
||||
conn->async_receive(c3);
|
||||
conn->async_receive2(c3);
|
||||
|
||||
run(conn);
|
||||
|
||||
@@ -326,4 +326,4 @@ BOOST_AUTO_TEST_CASE(issue_287_generic_response_error_then_success)
|
||||
BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace
|
||||
|
||||
@@ -41,7 +41,7 @@ class test_monitor {
|
||||
|
||||
void start_receive()
|
||||
{
|
||||
conn.async_receive([this](error_code ec, std::size_t) {
|
||||
conn.async_receive2([this](error_code ec) {
|
||||
// We should expect one push entry, at least
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST(monitor_resp.has_value());
|
||||
@@ -118,4 +118,4 @@ int main()
|
||||
test_monitor{}.run();
|
||||
|
||||
return boost::report_errors();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ using boost::redis::connection;
|
||||
using boost::system::error_code;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::system::error_code;
|
||||
@@ -78,8 +79,8 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
|
||||
|
||||
run(conn, make_test_config(), {});
|
||||
|
||||
conn->async_receive([&, conn](error_code ec, std::size_t) {
|
||||
std::cout << "async_receive" << std::endl;
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
std::cout << "async_receive2" << std::endl;
|
||||
BOOST_TEST(ec == error_code());
|
||||
push_received = true;
|
||||
conn->cancel();
|
||||
@@ -98,10 +99,12 @@ BOOST_AUTO_TEST_CASE(push_received1)
|
||||
net::io_context ioc;
|
||||
auto conn = std::make_shared<connection>(ioc);
|
||||
|
||||
generic_response resp;
|
||||
conn->set_receive_response(resp);
|
||||
|
||||
// Trick: Uses SUBSCRIBE because this command has no response or
|
||||
// better said, its response is a server push, which is what we
|
||||
// want to test. We send two because we want to test both
|
||||
// async_receive and receive.
|
||||
// want to test.
|
||||
request req;
|
||||
req.push("SUBSCRIBE", "channel1");
|
||||
req.push("SUBSCRIBE", "channel2");
|
||||
@@ -114,25 +117,13 @@ BOOST_AUTO_TEST_CASE(push_received1)
|
||||
BOOST_TEST(ec == error_code());
|
||||
});
|
||||
|
||||
conn->async_receive([&, conn](error_code ec, std::size_t) {
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
push_received = true;
|
||||
std::cout << "(1) async_receive" << std::endl;
|
||||
std::cout << "async_receive2" << std::endl;
|
||||
|
||||
BOOST_TEST(ec == error_code());
|
||||
|
||||
// Receives the second push synchronously.
|
||||
error_code ec2;
|
||||
std::size_t res = 0;
|
||||
res = conn->receive(ec2);
|
||||
BOOST_TEST(!ec2);
|
||||
BOOST_TEST(res != std::size_t(0));
|
||||
|
||||
// Tries to receive a third push synchronously.
|
||||
ec2 = {};
|
||||
res = conn->receive(ec2);
|
||||
BOOST_CHECK_EQUAL(
|
||||
ec2,
|
||||
boost::redis::make_error_code(boost::redis::error::sync_receive_push_failed));
|
||||
BOOST_CHECK_EQUAL(resp.value().get_total_msgs(), 2u);
|
||||
|
||||
conn->cancel();
|
||||
});
|
||||
@@ -164,7 +155,7 @@ BOOST_AUTO_TEST_CASE(push_filtered_out)
|
||||
BOOST_TEST(ec == error_code());
|
||||
});
|
||||
|
||||
conn->async_receive([&, conn](error_code ec, std::size_t) {
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
push_received = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->cancel(operation::reconnection);
|
||||
@@ -212,7 +203,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
|
||||
|
||||
bool push_received = false, exec_finished = false, run_finished = false;
|
||||
|
||||
conn->async_receive([&, conn](error_code ec, std::size_t) {
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled);
|
||||
push_received = true;
|
||||
});
|
||||
@@ -240,7 +231,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
|
||||
|
||||
void launch_push_consumer(std::shared_ptr<connection> conn)
|
||||
{
|
||||
conn->async_receive([conn](error_code ec, std::size_t) {
|
||||
conn->async_receive2([conn](error_code ec) {
|
||||
if (ec) {
|
||||
BOOST_TEST(ec == net::experimental::error::channel_cancelled);
|
||||
return;
|
||||
|
||||
@@ -42,16 +42,15 @@ namespace {
|
||||
// Push consumer
|
||||
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
std::cout << "uuu" << std::endl;
|
||||
std::cout << "Entering receiver" << std::endl;
|
||||
while (conn->will_reconnect()) {
|
||||
std::cout << "dddd" << std::endl;
|
||||
// Loop reading Redis pushs messages.
|
||||
for (;;) {
|
||||
std::cout << "aaaa" << std::endl;
|
||||
error_code ec;
|
||||
co_await conn->async_receive(net::redirect_error(ec));
|
||||
std::cout << "Reconnect loop" << std::endl;
|
||||
// Loop reading Redis pushes.
|
||||
for (error_code ec;;) {
|
||||
std::cout << "Receive loop" << std::endl;
|
||||
co_await conn->async_receive2(net::redirect_error(ec));
|
||||
if (ec) {
|
||||
std::cout << "Error in async_receive" << std::endl;
|
||||
std::cout << "Error in async_receive2" << std::endl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user