mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Concludes the work started by Nikolai Vladimirov on the generic_flat_response
This commit is contained in:
@@ -56,6 +56,7 @@ make_test(test_conn_run_cancel)
|
||||
make_test(test_conn_check_health)
|
||||
make_test(test_conn_exec)
|
||||
make_test(test_conn_push)
|
||||
make_test(test_conn_push2)
|
||||
make_test(test_conn_monitor)
|
||||
make_test(test_conn_reconnect)
|
||||
make_test(test_conn_exec_cancel)
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
#include <boost/test/included/unit_test.hpp>
|
||||
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::generic_flat_response;
|
||||
using boost::redis::resp3::flat_tree;
|
||||
using boost::redis::response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::any_adapter;
|
||||
@@ -25,7 +25,7 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types)
|
||||
response<int> r1;
|
||||
response<int, std::string> r2;
|
||||
generic_response r3;
|
||||
generic_flat_response r4;
|
||||
flat_tree r4;
|
||||
|
||||
BOOST_CHECK_NO_THROW(any_adapter{r1});
|
||||
BOOST_CHECK_NO_THROW(any_adapter{r2});
|
||||
|
||||
@@ -262,4 +262,4 @@ int main()
|
||||
test_flexible().run();
|
||||
|
||||
return boost::report_errors();
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
@@ -27,6 +27,7 @@ using error_code = boost::system::error_code;
|
||||
using boost::redis::operation;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::resp3::flat_tree;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::logger;
|
||||
@@ -54,39 +55,44 @@ std::ostream& operator<<(std::ostream& os, usage const& u)
|
||||
|
||||
namespace {
|
||||
|
||||
auto push_consumer(connection& conn, int expected) -> net::awaitable<void>
|
||||
auto
|
||||
receiver(
|
||||
connection& conn,
|
||||
flat_tree& resp,
|
||||
std::size_t expected) -> net::awaitable<void>
|
||||
{
|
||||
int c = 0;
|
||||
for (error_code ec;;) {
|
||||
conn.receive(ec);
|
||||
if (ec == error::sync_receive_push_failed) {
|
||||
ec = {};
|
||||
co_await conn.async_receive(net::redirect_error(ec));
|
||||
} else if (!ec) {
|
||||
//std::cout << "Skipping suspension." << std::endl;
|
||||
}
|
||||
|
||||
if (ec) {
|
||||
BOOST_TEST(false, "push_consumer error: " << ec.message());
|
||||
co_return;
|
||||
}
|
||||
if (++c == expected)
|
||||
break;
|
||||
std::size_t push_counter = 0;
|
||||
while (push_counter != expected) {
|
||||
co_await conn.async_receive2();
|
||||
push_counter += resp.get_total_msgs();
|
||||
resp.clear();
|
||||
}
|
||||
|
||||
conn.cancel();
|
||||
}
|
||||
|
||||
auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable<void>
|
||||
auto echo_session(connection& conn, const request& req, std::size_t n) -> net::awaitable<void>
|
||||
{
|
||||
for (auto i = 0; i < n; ++i)
|
||||
co_await conn.async_exec(pubs);
|
||||
for (auto i = 0u; i < n; ++i)
|
||||
co_await conn.async_exec(req);
|
||||
}
|
||||
|
||||
void rethrow_on_error(std::exception_ptr exc)
|
||||
{
|
||||
if (exc)
|
||||
if (exc) {
|
||||
BOOST_TEST(false);
|
||||
std::rethrow_exception(exc);
|
||||
}
|
||||
}
|
||||
|
||||
request make_pub_req(std::size_t n_pubs)
|
||||
{
|
||||
request req;
|
||||
req.push("PING");
|
||||
for (std::size_t i = 0u; i < n_pubs; ++i)
|
||||
req.push("PUBLISH", "channel", "payload");
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
@@ -98,22 +104,22 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
|
||||
// Number of coroutines that will send pings sharing the same
|
||||
// connection to redis.
|
||||
constexpr int sessions = 150;
|
||||
constexpr std::size_t sessions = 150u;
|
||||
|
||||
// The number of pings that will be sent by each session.
|
||||
constexpr int msgs = 200;
|
||||
constexpr std::size_t msgs = 200u;
|
||||
|
||||
// The number of publishes that will be sent by each session with
|
||||
// each message.
|
||||
constexpr int n_pubs = 25;
|
||||
constexpr std::size_t n_pubs = 25u;
|
||||
|
||||
// This is the total number of pushes we will receive.
|
||||
constexpr int total_pushes = sessions * msgs * n_pubs + 1;
|
||||
constexpr std::size_t total_pushes = sessions * msgs * n_pubs + 1;
|
||||
|
||||
request pubs;
|
||||
pubs.push("PING");
|
||||
for (int i = 0; i < n_pubs; ++i)
|
||||
pubs.push("PUBLISH", "channel", "payload");
|
||||
flat_tree resp;
|
||||
conn.set_receive_response(resp);
|
||||
|
||||
request const pub_req = make_pub_req(n_pubs);
|
||||
|
||||
// Run the connection
|
||||
bool run_finished = false, subscribe_finished = false;
|
||||
@@ -123,6 +129,10 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
std::clog << "async_run finished" << std::endl;
|
||||
});
|
||||
|
||||
// Op that will consume the pushes counting down until all expected
|
||||
// pushes have been received.
|
||||
net::co_spawn(ctx, receiver(conn, resp, total_pushes), rethrow_on_error);
|
||||
|
||||
// Subscribe, then launch the coroutines
|
||||
request req;
|
||||
req.push("SUBSCRIBE", "channel");
|
||||
@@ -130,12 +140,8 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
subscribe_finished = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
|
||||
// Op that will consume the pushes counting down until all expected
|
||||
// pushes have been received.
|
||||
net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error);
|
||||
|
||||
for (int i = 0; i < sessions; ++i)
|
||||
net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error);
|
||||
for (std::size_t i = 0; i < sessions; ++i)
|
||||
net::co_spawn(ctx, echo_session(conn, pub_req, msgs), rethrow_on_error);
|
||||
});
|
||||
|
||||
// Run the test
|
||||
@@ -144,7 +150,13 @@ BOOST_AUTO_TEST_CASE(echo_stress)
|
||||
BOOST_TEST(subscribe_finished);
|
||||
|
||||
// Print statistics
|
||||
std::cout << "-------------------\n" << conn.get_usage() << std::endl;
|
||||
std::cout
|
||||
<< "-------------------\n"
|
||||
<< "Usage data: \n"
|
||||
<< conn.get_usage() << "\n"
|
||||
<< "-------------------\n"
|
||||
<< "Reallocations: " << resp.get_reallocs()
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
@@ -26,7 +26,7 @@
|
||||
namespace net = boost::asio;
|
||||
using boost::redis::config;
|
||||
using boost::redis::connection;
|
||||
using boost::redis::generic_flat_response;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::operation;
|
||||
using boost::redis::request;
|
||||
|
||||
@@ -32,7 +32,7 @@ using boost::redis::operation;
|
||||
using boost::redis::error;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::generic_flat_response;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::logger;
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/redis/connection.hpp>
|
||||
|
||||
#include <cstddef>
|
||||
#define BOOST_TEST_MODULE conn_exec_cancel
|
||||
#include <boost/test/included/unit_test.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#ifdef BOOST_ASIO_HAS_CO_AWAIT
|
||||
|
||||
// NOTE1: Sends hello separately. I have observed that if hello and
|
||||
// blpop are sent toguether, Redis will send the response of hello
|
||||
// right away, not waiting for blpop. That is why we have to send it
|
||||
// separately.
|
||||
|
||||
namespace net = boost::asio;
|
||||
using error_code = boost::system::error_code;
|
||||
using boost::redis::operation;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::generic_flat_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::config;
|
||||
using boost::redis::logger;
|
||||
using boost::redis::connection;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace {
|
||||
|
||||
auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
|
||||
generic_flat_response gresp;
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
|
||||
run(conn);
|
||||
|
||||
net::steady_timer st{ex};
|
||||
st.expires_after(std::chrono::seconds{1});
|
||||
|
||||
// See NOTE1.
|
||||
request req0;
|
||||
req0.push("PING", "async_ignore_explicit_cancel_of_req_written");
|
||||
co_await conn->async_exec(req0, gresp);
|
||||
|
||||
request req1;
|
||||
req1.push("BLPOP", "any", 3);
|
||||
|
||||
bool seen = false;
|
||||
conn->async_exec(req1, gresp, [&](error_code ec, std::size_t) {
|
||||
// No error should occur since the cancellation should be ignored
|
||||
std::cout << "async_exec (1): " << ec.message() << std::endl;
|
||||
BOOST_TEST(ec == error_code());
|
||||
seen = true;
|
||||
});
|
||||
|
||||
// Will complete while BLPOP is pending.
|
||||
error_code ec;
|
||||
co_await st.async_wait(net::redirect_error(ec));
|
||||
conn->cancel(operation::exec);
|
||||
|
||||
BOOST_TEST(ec == error_code());
|
||||
|
||||
request req2;
|
||||
req2.push("PING");
|
||||
|
||||
// Test whether the connection remains usable after a call to
|
||||
// cancel(exec).
|
||||
co_await conn->async_exec(req2, gresp, net::redirect_error(ec));
|
||||
conn->cancel();
|
||||
|
||||
BOOST_TEST(ec == error_code());
|
||||
BOOST_TEST(seen);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_ignore_explicit_cancel_of_req_written)
|
||||
{
|
||||
run_coroutine_test(async_ignore_explicit_cancel_of_req_written());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
#else
|
||||
BOOST_AUTO_TEST_CASE(dummy) { }
|
||||
#endif
|
||||
@@ -21,7 +21,7 @@ using error_code = boost::system::error_code;
|
||||
using boost::redis::connection;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::generic_flat_response;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::error;
|
||||
@@ -266,12 +266,12 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
|
||||
|
||||
conn->async_exec(req1, ignore, c1);
|
||||
|
||||
generic_flat_response gresp;
|
||||
generic_response gresp;
|
||||
conn->set_receive_response(gresp);
|
||||
|
||||
auto c3 = [&](error_code ec, std::size_t) {
|
||||
auto c3 = [&](error_code ec) {
|
||||
c3_called = true;
|
||||
std::cout << "async_receive" << std::endl;
|
||||
std::cout << "async_receive2" << std::endl;
|
||||
BOOST_TEST(!ec);
|
||||
BOOST_TEST(gresp.has_error());
|
||||
BOOST_CHECK_EQUAL(gresp.error().data_type, resp3::type::simple_error);
|
||||
@@ -281,7 +281,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
|
||||
conn->cancel(operation::reconnection);
|
||||
};
|
||||
|
||||
conn->async_receive(c3);
|
||||
conn->async_receive2(c3);
|
||||
|
||||
run(conn);
|
||||
|
||||
@@ -326,4 +326,4 @@ BOOST_AUTO_TEST_CASE(issue_287_generic_response_error_then_success)
|
||||
BOOST_TEST(resp.error().diagnostic == "ERR wrong number of arguments for 'set' command");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace
|
||||
|
||||
@@ -41,7 +41,7 @@ class test_monitor {
|
||||
|
||||
void start_receive()
|
||||
{
|
||||
conn.async_receive([this](error_code ec, std::size_t) {
|
||||
conn.async_receive2([this](error_code ec) {
|
||||
// We should expect one push entry, at least
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST(monitor_resp.has_value());
|
||||
@@ -118,4 +118,4 @@ int main()
|
||||
test_monitor{}.run();
|
||||
|
||||
return boost::report_errors();
|
||||
}
|
||||
}
|
||||
|
||||
392
test/test_conn_push2.cpp
Normal file
392
test/test_conn_push2.cpp
Normal file
@@ -0,0 +1,392 @@
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/redis/connection.hpp>
|
||||
#include <boost/redis/logger.hpp>
|
||||
#include <boost/redis/request.hpp>
|
||||
#include <boost/redis/response.hpp>
|
||||
|
||||
#include <boost/asio/experimental/channel_error.hpp>
|
||||
#include <boost/system/errc.hpp>
|
||||
|
||||
#include <string>
|
||||
|
||||
#define BOOST_TEST_MODULE conn_push
|
||||
#include <boost/test/included/unit_test.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#include <cstddef>
|
||||
#include <iostream>
|
||||
|
||||
namespace net = boost::asio;
|
||||
namespace redis = boost::redis;
|
||||
|
||||
using boost::redis::operation;
|
||||
using boost::redis::connection;
|
||||
using boost::system::error_code;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::resp3::flat_tree;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::system::error_code;
|
||||
using boost::redis::logger;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace {
|
||||
|
||||
BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
|
||||
{
|
||||
request req1;
|
||||
req1.push("HELLO", 3);
|
||||
req1.push("PING", "Message1");
|
||||
|
||||
request req2;
|
||||
req2.push("SUBSCRIBE", "channel");
|
||||
|
||||
request req3;
|
||||
req3.push("PING", "Message2");
|
||||
req3.push("QUIT");
|
||||
|
||||
net::io_context ioc;
|
||||
|
||||
auto conn = std::make_shared<connection>(ioc);
|
||||
|
||||
bool push_received = false, c1_called = false, c2_called = false, c3_called = false;
|
||||
|
||||
auto c3 = [&](error_code ec, std::size_t) {
|
||||
c3_called = true;
|
||||
std::cout << "c3: " << ec.message() << std::endl;
|
||||
};
|
||||
|
||||
auto c2 = [&, conn](error_code ec, std::size_t) {
|
||||
c2_called = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req3, ignore, c3);
|
||||
};
|
||||
|
||||
auto c1 = [&, conn](error_code ec, std::size_t) {
|
||||
c1_called = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req2, ignore, c2);
|
||||
};
|
||||
|
||||
conn->async_exec(req1, ignore, c1);
|
||||
|
||||
run(conn, make_test_config(), {});
|
||||
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
std::cout << "async_receive2" << std::endl;
|
||||
BOOST_TEST(ec == error_code());
|
||||
push_received = true;
|
||||
conn->cancel();
|
||||
});
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
BOOST_TEST(push_received);
|
||||
BOOST_TEST(c1_called);
|
||||
BOOST_TEST(c2_called);
|
||||
BOOST_TEST(c3_called);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(push_received1)
|
||||
{
|
||||
net::io_context ioc;
|
||||
auto conn = std::make_shared<connection>(ioc);
|
||||
|
||||
flat_tree resp;
|
||||
conn->set_receive_response(resp);
|
||||
|
||||
// Trick: Uses SUBSCRIBE because this command has no response or
|
||||
// better said, its response is a server push, which is what we
|
||||
// want to test.
|
||||
request req;
|
||||
req.push("SUBSCRIBE", "channel1");
|
||||
req.push("SUBSCRIBE", "channel2");
|
||||
|
||||
bool push_received = false, exec_finished = false;
|
||||
|
||||
conn->async_exec(req, ignore, [&, conn](error_code ec, std::size_t) {
|
||||
exec_finished = true;
|
||||
std::cout << "async_exec" << std::endl;
|
||||
BOOST_TEST(ec == error_code());
|
||||
});
|
||||
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
push_received = true;
|
||||
std::cout << "async_receive2" << std::endl;
|
||||
|
||||
BOOST_TEST(ec == error_code());
|
||||
|
||||
BOOST_CHECK_EQUAL(resp.get_total_msgs(), 2u);
|
||||
|
||||
conn->cancel();
|
||||
});
|
||||
|
||||
run(conn);
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
BOOST_TEST(exec_finished);
|
||||
BOOST_TEST(push_received);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(push_filtered_out)
|
||||
{
|
||||
net::io_context ioc;
|
||||
auto conn = std::make_shared<connection>(ioc);
|
||||
|
||||
request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("PING");
|
||||
req.push("SUBSCRIBE", "channel");
|
||||
req.push("QUIT");
|
||||
|
||||
response<ignore_t, std::string, std::string> resp;
|
||||
|
||||
bool exec_finished = false, push_received = false;
|
||||
|
||||
conn->async_exec(req, resp, [conn, &exec_finished](error_code ec, std::size_t) {
|
||||
exec_finished = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
});
|
||||
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
push_received = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->cancel(operation::reconnection);
|
||||
});
|
||||
|
||||
run(conn);
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
BOOST_TEST(exec_finished);
|
||||
BOOST_TEST(push_received);
|
||||
|
||||
BOOST_CHECK_EQUAL(std::get<1>(resp).value(), "PONG");
|
||||
BOOST_CHECK_EQUAL(std::get<2>(resp).value(), "OK");
|
||||
}
|
||||
|
||||
struct response_error_tag { };
|
||||
response_error_tag error_tag_obj;
|
||||
|
||||
struct response_error_adapter {
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
void on_node(
|
||||
boost::redis::resp3::basic_node<std::string_view> const&,
|
||||
boost::system::error_code& ec)
|
||||
{
|
||||
ec = boost::redis::error::incompatible_size;
|
||||
}
|
||||
};
|
||||
|
||||
auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; }
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_push_adapter)
|
||||
{
|
||||
net::io_context ioc;
|
||||
auto conn = std::make_shared<connection>(ioc);
|
||||
|
||||
request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("PING");
|
||||
req.push("SUBSCRIBE", "channel");
|
||||
req.push("PING");
|
||||
|
||||
conn->set_receive_response(error_tag_obj);
|
||||
|
||||
bool push_received = false, exec_finished = false, run_finished = false;
|
||||
|
||||
conn->async_receive2([&, conn](error_code ec) {
|
||||
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_cancelled);
|
||||
push_received = true;
|
||||
});
|
||||
|
||||
conn->async_exec(req, ignore, [&exec_finished](error_code ec, std::size_t) {
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
exec_finished = true;
|
||||
});
|
||||
|
||||
auto cfg = make_test_config();
|
||||
cfg.reconnect_wait_interval = 0s;
|
||||
conn->async_run(cfg, [&run_finished](error_code ec) {
|
||||
BOOST_CHECK_EQUAL(ec, redis::error::incompatible_size);
|
||||
run_finished = true;
|
||||
});
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
BOOST_TEST(push_received);
|
||||
BOOST_TEST(exec_finished);
|
||||
BOOST_TEST(run_finished);
|
||||
|
||||
// TODO: Reset the ioc reconnect and send a quit to ensure
|
||||
// reconnection is possible after an error.
|
||||
}
|
||||
|
||||
void launch_push_consumer(std::shared_ptr<connection> conn)
|
||||
{
|
||||
conn->async_receive2([conn](error_code ec) {
|
||||
if (ec) {
|
||||
BOOST_TEST(ec == net::experimental::error::channel_cancelled);
|
||||
return;
|
||||
}
|
||||
launch_push_consumer(conn);
|
||||
});
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(many_subscribers)
|
||||
{
|
||||
request req0;
|
||||
req0.get_config().cancel_on_connection_lost = false;
|
||||
req0.push("HELLO", 3);
|
||||
|
||||
request req1;
|
||||
req1.get_config().cancel_on_connection_lost = false;
|
||||
req1.push("PING", "Message1");
|
||||
|
||||
request req2;
|
||||
req2.get_config().cancel_on_connection_lost = false;
|
||||
req2.push("SUBSCRIBE", "channel");
|
||||
|
||||
request req3;
|
||||
req3.get_config().cancel_on_connection_lost = false;
|
||||
req3.push("QUIT");
|
||||
|
||||
net::io_context ioc;
|
||||
auto conn = std::make_shared<connection>(ioc);
|
||||
|
||||
bool finished = false;
|
||||
|
||||
auto c11 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->cancel(operation::reconnection);
|
||||
finished = true;
|
||||
};
|
||||
auto c10 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req3, ignore, c11);
|
||||
};
|
||||
auto c9 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req2, ignore, c10);
|
||||
};
|
||||
auto c8 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req1, ignore, c9);
|
||||
};
|
||||
auto c7 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req2, ignore, c8);
|
||||
};
|
||||
auto c6 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req2, ignore, c7);
|
||||
};
|
||||
auto c5 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req1, ignore, c6);
|
||||
};
|
||||
auto c4 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req2, ignore, c5);
|
||||
};
|
||||
auto c3 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req1, ignore, c4);
|
||||
};
|
||||
auto c2 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req2, ignore, c3);
|
||||
};
|
||||
auto c1 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req2, ignore, c2);
|
||||
};
|
||||
auto c0 = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->async_exec(req1, ignore, c1);
|
||||
};
|
||||
|
||||
conn->async_exec(req0, ignore, c0);
|
||||
launch_push_consumer(conn);
|
||||
|
||||
run(conn, make_test_config(), {});
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
BOOST_TEST(finished);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_unsubscribe)
|
||||
{
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
|
||||
// Subscribe to 3 channels and 2 patterns. Use CLIENT INFO to verify this took effect
|
||||
request req_subscribe;
|
||||
req_subscribe.push("SUBSCRIBE", "ch1", "ch2", "ch3");
|
||||
req_subscribe.push("PSUBSCRIBE", "ch1*", "ch2*");
|
||||
req_subscribe.push("CLIENT", "INFO");
|
||||
|
||||
// Then, unsubscribe from some of them, and verify again
|
||||
request req_unsubscribe;
|
||||
req_unsubscribe.push("UNSUBSCRIBE", "ch1");
|
||||
req_unsubscribe.push("PUNSUBSCRIBE", "ch2*");
|
||||
req_unsubscribe.push("CLIENT", "INFO");
|
||||
|
||||
// Finally, ping to verify that the connection is still usable
|
||||
request req_ping;
|
||||
req_ping.push("PING", "test_unsubscribe");
|
||||
|
||||
response<std::string> resp_subscribe, resp_unsubscribe, resp_ping;
|
||||
|
||||
bool subscribe_finished = false, unsubscribe_finished = false, ping_finished = false,
|
||||
run_finished = false;
|
||||
|
||||
auto on_ping = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
ping_finished = true;
|
||||
BOOST_TEST(std::get<0>(resp_ping).has_value());
|
||||
BOOST_TEST(std::get<0>(resp_ping).value() == "test_unsubscribe");
|
||||
conn.cancel();
|
||||
};
|
||||
|
||||
auto on_unsubscribe = [&](error_code ec, std::size_t) {
|
||||
unsubscribe_finished = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
BOOST_TEST(std::get<0>(resp_unsubscribe).has_value());
|
||||
BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "sub") == "2");
|
||||
BOOST_TEST(find_client_info(std::get<0>(resp_unsubscribe).value(), "psub") == "1");
|
||||
conn.async_exec(req_ping, resp_ping, on_ping);
|
||||
};
|
||||
|
||||
auto on_subscribe = [&](error_code ec, std::size_t) {
|
||||
subscribe_finished = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
BOOST_TEST(std::get<0>(resp_subscribe).has_value());
|
||||
BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "sub") == "3");
|
||||
BOOST_TEST(find_client_info(std::get<0>(resp_subscribe).value(), "psub") == "2");
|
||||
conn.async_exec(req_unsubscribe, resp_unsubscribe, on_unsubscribe);
|
||||
};
|
||||
|
||||
conn.async_exec(req_subscribe, resp_subscribe, on_subscribe);
|
||||
|
||||
conn.async_run(make_test_config(), [&run_finished](error_code ec) {
|
||||
BOOST_TEST(ec == net::error::operation_aborted);
|
||||
run_finished = true;
|
||||
});
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
BOOST_TEST(subscribe_finished);
|
||||
BOOST_TEST(unsubscribe_finished);
|
||||
BOOST_TEST(ping_finished);
|
||||
BOOST_TEST(run_finished);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
@@ -42,16 +42,15 @@ namespace {
|
||||
// Push consumer
|
||||
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
std::cout << "uuu" << std::endl;
|
||||
std::cout << "Entering receiver" << std::endl;
|
||||
while (conn->will_reconnect()) {
|
||||
std::cout << "dddd" << std::endl;
|
||||
// Loop reading Redis pushs messages.
|
||||
for (;;) {
|
||||
std::cout << "aaaa" << std::endl;
|
||||
error_code ec;
|
||||
co_await conn->async_receive(net::redirect_error(ec));
|
||||
std::cout << "Reconnect loop" << std::endl;
|
||||
// Loop reading Redis pushes.
|
||||
for (error_code ec;;) {
|
||||
std::cout << "Receive loop" << std::endl;
|
||||
co_await conn->async_receive2(net::redirect_error(ec));
|
||||
if (ec) {
|
||||
std::cout << "Error in async_receive" << std::endl;
|
||||
std::cout << "Error in async_receive2" << std::endl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ using boost::system::error_code;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::generic_flat_response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::adapter::result;
|
||||
@@ -635,7 +634,7 @@ BOOST_AUTO_TEST_CASE(cancel_one_1)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(cancel_one_empty)
|
||||
{
|
||||
generic_flat_response resp;
|
||||
generic_response resp;
|
||||
BOOST_TEST(resp.has_value());
|
||||
|
||||
consume_one(resp);
|
||||
@@ -644,7 +643,7 @@ BOOST_AUTO_TEST_CASE(cancel_one_empty)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(cancel_one_has_error)
|
||||
{
|
||||
generic_flat_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}};
|
||||
generic_response resp = boost::redis::adapter::error{resp3::type::simple_string, {}};
|
||||
BOOST_TEST(resp.has_error());
|
||||
|
||||
consume_one(resp);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
@@ -22,15 +22,19 @@
|
||||
using boost::redis::request;
|
||||
using boost::redis::adapter::adapt2;
|
||||
using boost::redis::adapter::result;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::resp3::tree;
|
||||
using boost::redis::resp3::flat_tree;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::resp3::detail::deserialize;
|
||||
using boost::redis::resp3::node;
|
||||
using boost::redis::resp3::node_view;
|
||||
using boost::redis::resp3::to_string;
|
||||
using boost::redis::response;
|
||||
using boost::redis::any_adapter;
|
||||
using boost::system::error_code;
|
||||
|
||||
namespace resp3 = boost::redis::resp3;
|
||||
|
||||
#define RESP3_SET_PART1 "~6\r\n+orange\r"
|
||||
#define RESP3_SET_PART2 "\n+apple\r\n+one"
|
||||
#define RESP3_SET_PART3 "\r\n+two\r"
|
||||
@@ -42,7 +46,9 @@ BOOST_AUTO_TEST_CASE(low_level_sync_sans_io)
|
||||
try {
|
||||
result<std::set<std::string>> resp;
|
||||
|
||||
deserialize(resp3_set, adapt2(resp));
|
||||
error_code ec;
|
||||
deserialize(resp3_set, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
for (auto const& e : resp.value())
|
||||
std::cout << e << std::endl;
|
||||
@@ -65,7 +71,9 @@ BOOST_AUTO_TEST_CASE(issue_210_empty_set)
|
||||
|
||||
char const* wire = "*4\r\n:1\r\n~0\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n";
|
||||
|
||||
deserialize(wire, adapt2(resp));
|
||||
error_code ec;
|
||||
deserialize(wire, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
|
||||
BOOST_CHECK(std::get<1>(resp.value()).value().empty());
|
||||
@@ -91,7 +99,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_one)
|
||||
char const*
|
||||
wire = "*4\r\n:1\r\n~1\r\n$3\r\nfoo\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n";
|
||||
|
||||
deserialize(wire, adapt2(resp));
|
||||
error_code ec;
|
||||
deserialize(wire, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
|
||||
BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().size(), 1u);
|
||||
@@ -118,7 +128,9 @@ BOOST_AUTO_TEST_CASE(issue_210_non_empty_set_size_two)
|
||||
char const* wire =
|
||||
"*4\r\n:1\r\n~2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n:2\r\n";
|
||||
|
||||
deserialize(wire, adapt2(resp));
|
||||
error_code ec;
|
||||
deserialize(wire, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
|
||||
BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value().at(0), std::string{"foo"});
|
||||
@@ -140,7 +152,9 @@ BOOST_AUTO_TEST_CASE(issue_210_no_nested)
|
||||
char const*
|
||||
wire = "*4\r\n:1\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$25\r\nthis_should_not_be_in_set\r\n";
|
||||
|
||||
deserialize(wire, adapt2(resp));
|
||||
error_code ec;
|
||||
deserialize(wire, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(std::get<0>(resp.value()).value(), 1);
|
||||
BOOST_CHECK_EQUAL(std::get<1>(resp.value()).value(), std::string{"foo"});
|
||||
@@ -159,7 +173,10 @@ BOOST_AUTO_TEST_CASE(issue_233_array_with_null)
|
||||
result<std::vector<std::optional<std::string>>> resp;
|
||||
|
||||
char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n";
|
||||
deserialize(wire, adapt2(resp));
|
||||
|
||||
error_code ec;
|
||||
deserialize(wire, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(resp.value().at(0).value(), "one");
|
||||
BOOST_TEST(!resp.value().at(1).has_value());
|
||||
@@ -177,7 +194,10 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null)
|
||||
result<std::optional<std::vector<std::optional<std::string>>>> resp;
|
||||
|
||||
char const* wire = "*3\r\n+one\r\n_\r\n+two\r\n";
|
||||
deserialize(wire, adapt2(resp));
|
||||
|
||||
error_code ec;
|
||||
deserialize(wire, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(resp.value().value().at(0).value(), "one");
|
||||
BOOST_TEST(!resp.value().value().at(1).has_value());
|
||||
@@ -313,3 +333,116 @@ BOOST_AUTO_TEST_CASE(check_counter_adapter)
|
||||
BOOST_CHECK_EQUAL(node, 7);
|
||||
BOOST_CHECK_EQUAL(done, 1);
|
||||
}
|
||||
|
||||
namespace boost::redis::resp3 {
|
||||
|
||||
template <class String>
|
||||
std::ostream& operator<<(std::ostream& os, basic_node<String> const& nd)
|
||||
{
|
||||
os << "type: " << to_string(nd.data_type) << "\n"
|
||||
<< "aggregate_size: " << nd.aggregate_size << "\n"
|
||||
<< "depth: " << nd.depth << "\n"
|
||||
<< "value: " << nd.value << "\n";
|
||||
return os;
|
||||
}
|
||||
|
||||
template <class String>
|
||||
std::ostream& operator<<(std::ostream& os, basic_tree<String> const& resp)
|
||||
{
|
||||
for (auto const& e: resp)
|
||||
os << e << ",";
|
||||
return os;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
node from_node_view(node_view const& v)
|
||||
{
|
||||
node ret;
|
||||
ret.data_type = v.data_type;
|
||||
ret.aggregate_size = v.aggregate_size;
|
||||
ret.depth = v.depth;
|
||||
ret.value = v.value;
|
||||
return ret;
|
||||
}
|
||||
|
||||
tree from_flat(flat_tree const& resp)
|
||||
{
|
||||
tree ret;
|
||||
for (auto const& e: resp.get_view())
|
||||
ret.push_back(from_node_view(e));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Parses the same data into a tree and a
|
||||
// flat_tree, they should be equal to each other.
|
||||
BOOST_AUTO_TEST_CASE(flat_tree_views_are_set)
|
||||
{
|
||||
tree resp1;
|
||||
flat_tree fresp;
|
||||
|
||||
error_code ec;
|
||||
deserialize(resp3_set, adapt2(resp1), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
deserialize(resp3_set, adapt2(fresp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(fresp.get_reallocs(), 1u);
|
||||
BOOST_CHECK_EQUAL(fresp.get_total_msgs(), 1u);
|
||||
|
||||
auto const resp2 = from_flat(fresp);
|
||||
BOOST_CHECK_EQUAL(resp1, resp2);
|
||||
}
|
||||
|
||||
// The response should be reusable.
|
||||
BOOST_AUTO_TEST_CASE(flat_tree_reuse)
|
||||
{
|
||||
flat_tree tmp;
|
||||
|
||||
// First use
|
||||
error_code ec;
|
||||
deserialize(resp3_set, adapt2(tmp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
BOOST_CHECK_EQUAL(tmp.get_reallocs(), 1u);
|
||||
BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u);
|
||||
|
||||
// Copy to compare after the reuse.
|
||||
auto const resp1 = tmp.get_view();
|
||||
tmp.clear();
|
||||
|
||||
// Second use
|
||||
deserialize(resp3_set, adapt2(tmp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
// No reallocation this time
|
||||
BOOST_CHECK_EQUAL(tmp.get_reallocs(), 0u);
|
||||
BOOST_CHECK_EQUAL(tmp.get_total_msgs(), 1u);
|
||||
|
||||
BOOST_CHECK_EQUAL(resp1, tmp.get_view());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(flat_tree_copy_assign)
|
||||
{
|
||||
flat_tree resp;
|
||||
|
||||
error_code ec;
|
||||
deserialize(resp3_set, adapt2(resp), ec);
|
||||
BOOST_CHECK_EQUAL(ec, error_code{});
|
||||
|
||||
// Copy
|
||||
resp3::flat_tree copy1{resp};
|
||||
|
||||
// Copy assignment
|
||||
resp3::flat_tree copy2 = resp;
|
||||
|
||||
// Assignment
|
||||
resp3::flat_tree copy3;
|
||||
copy3 = resp;
|
||||
|
||||
BOOST_TEST((copy1 == resp));
|
||||
BOOST_TEST((copy2 == resp));
|
||||
BOOST_TEST((copy3 == resp));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user