2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Adds reliable cancellation support to async_exec (#310)

* Terminal cancellation in async_exec no longer tears down the connection when the cancelled request has been sent to the server.
* Adds support for partial cancellation in async_exec with the same semantics.
This commit is contained in:
Anarthal (Rubén Pérez)
2025-09-26 12:50:42 +02:00
committed by GitHub
parent bcf120bd8f
commit f955dc01d2
13 changed files with 760 additions and 410 deletions

View File

@@ -54,7 +54,6 @@ make_test(test_conn_exec)
make_test(test_conn_push)
make_test(test_conn_reconnect)
make_test(test_conn_exec_cancel)
make_test(test_conn_exec_cancel2)
make_test(test_conn_echo_stress)
make_test(test_conn_move)
make_test(test_conn_setup)

View File

@@ -122,25 +122,6 @@ BOOST_AUTO_TEST_CASE(wrong_response_data_type)
BOOST_TEST(finished);
}
BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected)
{
request req;
req.get_config().cancel_if_not_connected = true;
req.push("PING");
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
bool finished = false;
conn->async_exec(req, ignore, [conn, &finished](error_code ec, std::size_t) {
BOOST_TEST(ec, boost::redis::error::not_connected);
conn->cancel();
finished = true;
});
ioc.run_for(test_timeout);
BOOST_TEST(finished);
}
BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170)
{
// See https://github.com/boostorg/redis/issues/170

View File

@@ -7,33 +7,27 @@
#include <boost/redis/connection.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/errc.hpp>
#include <cstddef>
#define BOOST_TEST_MODULE conn_exec_cancel
#include <boost/asio/detached.hpp>
#include <boost/test/included/unit_test.hpp>
#include "common.hpp"
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <cstddef>
#include <iostream>
using namespace std::chrono_literals;
// NOTE1: I have observed that if hello and
// blpop are sent together, Redis will send the response of hello
// right away, not waiting for blpop.
namespace net = boost::asio;
using error_code = boost::system::error_code;
using namespace net::experimental::awaitable_operators;
using boost::redis::operation;
using boost::redis::error;
using boost::redis::request;
@@ -45,95 +39,15 @@ using boost::redis::logger;
using boost::redis::connection;
using namespace std::chrono_literals;
// TODO: replace this by connection once it supports asio::cancel_after
// See https://github.com/boostorg/redis/issues/226
using connection_type = boost::redis::basic_connection<net::any_io_executor>;
namespace {
auto implicit_cancel_of_req_written() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds::zero();
run(conn, cfg);
// See NOTE1.
request req0;
req0.push("PING");
co_await conn->async_exec(req0, ignore);
// Will be cancelled after it has been written but before the
// response arrives.
request req1;
req1.push("BLPOP", "any", 3);
net::steady_timer st{ex};
st.expires_after(std::chrono::seconds{1});
// Achieves implicit cancellation when the timer fires.
boost::system::error_code ec1, ec2;
co_await (conn->async_exec(req1, ignore, redir(ec1)) || st.async_wait(redir(ec2)));
conn->cancel();
// I have observed this produces terminal cancellation so it can't
// be ignored, an error is expected.
BOOST_TEST(ec1 == net::error::operation_aborted);
BOOST_TEST(ec2 == error_code());
}
BOOST_AUTO_TEST_CASE(test_ignore_implicit_cancel_of_req_written)
{
run_coroutine_test(implicit_cancel_of_req_written());
}
BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
request req0;
req0.push("PING");
// Sends a request that will be blocked forever, so we can test
// canceling it while waiting for a response.
request req1;
req1.get_config().cancel_on_connection_lost = true;
req1.get_config().cancel_if_unresponded = true;
req1.push("BLPOP", "any", 0);
bool finished = false;
auto c1 = [&](error_code ec, std::size_t) {
BOOST_CHECK_EQUAL(ec, net::error::operation_aborted);
finished = true;
};
auto c0 = [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->async_exec(req1, ignore, c1);
};
conn->async_exec(req0, ignore, c0);
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds{5};
run(conn);
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});
st.async_wait([&](error_code ec) {
BOOST_TEST(ec == error_code());
conn->cancel(operation::run);
conn->cancel(operation::reconnection);
});
ioc.run_for(test_timeout);
BOOST_TEST(finished);
}
// We can cancel requests that haven't been written yet.
// All cancellation types are supported here.
BOOST_AUTO_TEST_CASE(test_cancel_pending)
void test_cancel_pending()
{
struct {
const char* name;
@@ -145,38 +59,247 @@ BOOST_AUTO_TEST_CASE(test_cancel_pending)
};
for (const auto& tc : test_cases) {
BOOST_TEST_CONTEXT(tc.name)
{
// Setup
net::io_context ctx;
connection conn(ctx);
request req;
req.push("get", "mykey");
std::cerr << "Running test case: " << tc.name << std::endl;
// Issue a request without calling async_run(), so the request stays waiting forever
net::cancellation_signal sig;
bool called = false;
conn.async_exec(
req,
ignore,
net::bind_cancellation_slot(sig.slot(), [&](error_code ec, std::size_t sz) {
BOOST_TEST(ec == net::error::operation_aborted);
BOOST_TEST(sz == 0u);
called = true;
}));
// Setup
net::io_context ctx;
connection conn(ctx);
request req;
req.push("get", "mykey");
// Issue a cancellation
sig.emit(tc.cancel_type);
// Issue a request without calling async_run(), so the request stays waiting forever
net::cancellation_signal sig;
bool called = false;
conn.async_exec(
req,
ignore,
net::bind_cancellation_slot(sig.slot(), [&](error_code ec, std::size_t sz) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
BOOST_TEST_EQ(sz, 0u);
called = true;
}));
// Prevent the test for deadlocking in case of failure
ctx.run_for(3s);
BOOST_TEST(called);
}
// Issue a cancellation
sig.emit(tc.cancel_type);
// Prevent the test for deadlocking in case of failure
ctx.run_for(test_timeout);
BOOST_TEST(called);
}
}
// We can cancel requests that have been written but which
// responses haven't been received yet.
// Terminal and partial cancellation types are supported here.
void test_cancel_written()
{
// Setup
net::io_context ctx;
connection_type conn{ctx};
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds::zero();
bool run_finished = false, exec1_finished = false, exec2_finished = false,
exec3_finished = false;
// Will be cancelled after it has been written but before the
// response arrives. Create everything in dynamic memory to verify
// we don't try to access things after completion.
auto req1 = std::make_unique<request>();
req1->push("BLPOP", "any", 1);
auto r1 = std::make_unique<response<std::string>>();
// Will be cancelled too because it's sent after BLPOP.
// Tests that partial cancellation is supported, too.
request req2;
req2.push("PING", "partial_cancellation");
// Will finish successfully once the response to the BLPOP arrives
request req3;
req3.push("PING", "after_blpop");
response<std::string> r3;
// Run the connection
conn.async_run(cfg, [&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
run_finished = true;
});
// The request will be cancelled before it receives a response.
// Our BLPOP will wait for longer than the timeout we're using.
// Clear allocated memory to check we don't access the request or
// response when the server response arrives.
auto blpop_cb = [&](error_code ec, std::size_t) {
req1.reset();
r1.reset();
BOOST_TEST_EQ(ec, net::error::operation_aborted);
exec1_finished = true;
};
conn.async_exec(*req1, *r1, net::cancel_after(500ms, blpop_cb));
// The first PING will be cancelled, too. Use partial cancellation here.
auto req2_cb = [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
exec2_finished = true;
};
conn.async_exec(
req2,
ignore,
net::cancel_after(500ms, net::cancellation_type_t::partial, req2_cb));
// The second PING's response will be received after the BLPOP's response,
// but it will be processed successfully.
conn.async_exec(req3, r3, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(std::get<0>(r3).value(), "after_blpop");
conn.cancel();
exec3_finished = true;
});
ctx.run_for(test_timeout);
BOOST_TEST(run_finished);
BOOST_TEST(exec1_finished);
BOOST_TEST(exec2_finished);
BOOST_TEST(exec3_finished);
}
// Requests configured to do so are cancelled if the connection
// hasn't been established when they are executed
void test_cancel_if_not_connected()
{
net::io_context ioc;
connection conn{ioc};
request req;
req.get_config().cancel_if_not_connected = true;
req.push("PING");
bool exec_finished = false;
conn.async_exec(req, ignore, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error::not_connected);
exec_finished = true;
});
ioc.run_for(test_timeout);
BOOST_TEST(exec_finished);
}
// Requests configured to do so are cancelled when the connection is lost.
// Tests with a written request that hasn't been responded yet
void test_cancel_on_connection_lost_written()
{
// Setup
net::io_context ioc;
connection conn{ioc};
// req0 and req1 will be coalesced together. When req0
// completes, we know that req1 will be waiting for a response.
// req1 will block forever.
request req0;
req0.push("PING");
request req1;
req1.get_config().cancel_on_connection_lost = true;
req1.get_config().cancel_if_unresponded = true;
req1.push("BLPOP", "any", 0);
bool run_finished = false, exec0_finished = false, exec1_finished = false;
// Run the connection
auto cfg = make_test_config();
conn.async_run(cfg, [&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
run_finished = true;
});
// Execute both requests
conn.async_exec(req0, ignore, [&](error_code ec, std::size_t) {
// The request finished successfully
BOOST_TEST_EQ(ec, error_code());
exec0_finished = true;
// We know that req1 has been written to the server, too. Trigger a cancellation
conn.cancel(operation::run);
conn.cancel(operation::reconnection);
});
conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
exec1_finished = true;
});
ioc.run_for(test_timeout);
BOOST_TEST(run_finished);
BOOST_TEST(exec0_finished);
BOOST_TEST(exec1_finished);
}
// connection::cancel(operation::exec) works. Pending requests are cancelled,
// but written requests are not
void test_cancel_operation_exec()
{
// Setup
net::io_context ctx;
connection conn{ctx};
bool run_finished = false, exec0_finished = false, exec1_finished = false,
exec2_finished = false;
request req0;
req0.push("PING", "before_blpop");
request req1;
req1.push("BLPOP", "any", 1);
generic_response r1;
request req2;
req2.push("PING", "after_blpop");
// Run the connection
conn.async_run(make_test_config(), [&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
run_finished = true;
});
// Execute req0 and req1. They will be coalesced together.
// When req0 completes, we know that req1 will be waiting its response
conn.async_exec(req0, ignore, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
exec0_finished = true;
conn.cancel(operation::exec);
});
// By default, ignore will issue an error when a NULL is received.
// ATM, this causes the connection to be torn down. Using a generic_response avoids this.
// See https://github.com/boostorg/redis/issues/314
conn.async_exec(req1, r1, [&](error_code ec, std::size_t) {
// No error should occur since the cancellation should be ignored
std::cout << "async_exec (1): " << ec.message() << std::endl;
BOOST_TEST_EQ(ec, error_code());
exec1_finished = true;
// The connection remains usable
conn.async_exec(req2, ignore, [&](error_code ec2, std::size_t) {
BOOST_TEST_EQ(ec2, error_code());
exec2_finished = true;
conn.cancel();
});
});
ctx.run_for(test_timeout);
BOOST_TEST(run_finished);
BOOST_TEST(exec0_finished);
BOOST_TEST(exec1_finished);
BOOST_TEST(exec2_finished);
}
} // namespace
#else
BOOST_AUTO_TEST_CASE(dummy) { }
#endif
int main()
{
test_cancel_pending();
test_cancel_written();
test_cancel_if_not_connected();
test_cancel_on_connection_lost_written();
test_cancel_operation_exec();
return boost::report_errors();
}

View File

@@ -1,95 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
#include <cstddef>
#define BOOST_TEST_MODULE conn_exec_cancel
#include <boost/test/included/unit_test.hpp>
#include "common.hpp"
#include <iostream>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
// NOTE1: Sends hello separately. I have observed that if hello and
// blpop are sent toguether, Redis will send the response of hello
// right away, not waiting for blpop. That is why we have to send it
// separately.
namespace net = boost::asio;
using error_code = boost::system::error_code;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::config;
using boost::redis::logger;
using boost::redis::connection;
using namespace std::chrono_literals;
namespace {
auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
generic_response gresp;
auto conn = std::make_shared<connection>(ex);
run(conn);
net::steady_timer st{ex};
st.expires_after(std::chrono::seconds{1});
// See NOTE1.
request req0;
req0.push("PING", "async_ignore_explicit_cancel_of_req_written");
co_await conn->async_exec(req0, gresp);
request req1;
req1.push("BLPOP", "any", 3);
bool seen = false;
conn->async_exec(req1, gresp, [&](error_code ec, std::size_t) {
// No error should occur since the cancellation should be ignored
std::cout << "async_exec (1): " << ec.message() << std::endl;
BOOST_TEST(ec == error_code());
seen = true;
});
// Will complete while BLPOP is pending.
error_code ec;
co_await st.async_wait(net::redirect_error(ec));
conn->cancel(operation::exec);
BOOST_TEST(ec == error_code());
request req2;
req2.push("PING");
// Test whether the connection remains usable after a call to
// cancel(exec).
co_await conn->async_exec(req2, gresp, net::redirect_error(ec));
conn->cancel();
BOOST_TEST(ec == error_code());
BOOST_TEST(seen);
}
BOOST_AUTO_TEST_CASE(test_ignore_explicit_cancel_of_req_written)
{
run_coroutine_test(async_ignore_explicit_cancel_of_req_written());
}
} // namespace
#else
BOOST_AUTO_TEST_CASE(dummy) { }
#endif

View File

@@ -44,6 +44,7 @@ net::awaitable<void> test_reconnect_impl()
request regular_req;
regular_req.push("PING", "SomeValue");
regular_req.get_config().cancel_on_connection_lost = false;
regular_req.get_config().cancel_if_unresponded = false;
auto conn = std::make_shared<connection>(ex);
auto cfg = make_test_config();

View File

@@ -281,56 +281,26 @@ void test_cancel_waiting()
}
}
// If the request is being processed and terminal cancellation got requested, we cancel the connection
void test_cancel_notwaiting_terminal()
{
// Setup
multiplexer mpx;
elem_and_request input;
exec_fsm fsm(mpx, std::move(input.elm));
// Initiate
auto act = fsm.resume(false, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// The multiplexer starts writing the request
BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write
// A cancellation arrives
act = fsm.resume(true, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, exec_action_type::cancel_run);
act = fsm.resume(true, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted));
// The object needs to survive here, otherwise an inconsistent connection state is created
}
// If the request is being processed and other types of cancellation got requested, we ignore the cancellation
void test_cancel_notwaiting_notterminal()
// If the request is being processed and terminal or partial
// cancellation is requested, we mark the request as abandoned
void test_cancel_notwaiting_terminal_partial()
{
constexpr struct {
const char* name;
asio::cancellation_type_t type;
} test_cases[] = {
{"partial", asio::cancellation_type_t::partial },
{"total", asio::cancellation_type_t::total },
{"mixed", asio::cancellation_type_t::partial | asio::cancellation_type_t::total},
{"terminal", asio::cancellation_type_t::terminal},
{"partial", asio::cancellation_type_t::partial },
};
for (const auto& tc : test_cases) {
// Setup
multiplexer mpx;
elem_and_request input;
exec_fsm fsm(mpx, std::move(input.elm));
error_code ec;
auto input = std::make_unique<elem_and_request>();
exec_fsm fsm(mpx, std::move(input->elm));
// Initiate
auto act = fsm.resume(true, cancellation_type_t::none);
auto act = fsm.resume(false, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::setup_cancellation, tc.name);
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::notify_writer, tc.name);
@@ -338,30 +308,67 @@ void test_cancel_notwaiting_notterminal()
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name);
// Simulate a successful write
// The multiplexer starts writing the request
BOOST_TEST_EQ_MSG(mpx.prepare_write(), 1u, tc.name);
BOOST_TEST_EQ_MSG(mpx.commit_write(), 0u, tc.name); // all requests expect a response
BOOST_TEST_EQ_MSG(mpx.commit_write(), 0u, tc.name);
// We got requested a cancellation here, but we can't honor it
// A cancellation arrives
act = fsm.resume(true, tc.type);
BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name);
BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted));
input.reset(); // Verify we don't access the request or response after completion
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
// When the response to this request arrives, it gets ignored
error_code ec;
auto res = mpx.consume_next("-ERR wrong command\r\n", ec);
BOOST_TEST_EQ_MSG(ec, error_code(), tc.name);
BOOST_TEST_EQ_MSG(req_status.first, consume_result::got_response, tc.name);
BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed
BOOST_TEST_EQ_MSG(input.done_calls, 1u, tc.name);
BOOST_TEST_EQ_MSG(res.first, consume_result::got_response, tc.name);
// This will awaken the exec operation, and should complete the operation
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action(error_code(), 11u), tc.name);
// All memory should have been freed by now
BOOST_TEST_EQ_MSG(input.weak_elm.expired(), true, tc.name);
// The multiplexer::elem object needs to survive here to mark the
// request as abandoned
}
}
// If the request is being processed and total cancellation is requested, we ignore the cancellation
void test_cancel_notwaiting_total()
{
// Setup
multiplexer mpx;
elem_and_request input;
exec_fsm fsm(mpx, std::move(input.elm));
error_code ec;
// Initiate
auto act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful write
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
// We got requested a cancellation here, but we can't honor it
act = fsm.resume(true, asio::cancellation_type_t::total);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
act = fsm.resume(true, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(error_code(), 11u));
// All memory should have been freed by now
BOOST_TEST_EQ(input.weak_elm.expired(), true);
}
} // namespace
int main()
@@ -371,8 +378,8 @@ int main()
test_cancel_if_not_connected();
test_not_connected();
test_cancel_waiting();
test_cancel_notwaiting_terminal();
test_cancel_notwaiting_notterminal();
test_cancel_notwaiting_terminal_partial();
test_cancel_notwaiting_total();
return boost::report_errors();
}

View File

@@ -12,10 +12,10 @@
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/response.hpp>
#include <boost/assert/source_location.hpp>
#include <boost/core/lightweight_test.hpp>
#include <iostream>
#include <iterator>
#include <memory>
#include <ostream>
#include <string>
@@ -65,20 +65,48 @@ struct test_item {
std::shared_ptr<multiplexer::elem> elem_ptr;
bool done = false;
test_item(bool cmd_with_response = true)
static request make_request(bool cmd_with_response = true)
{
request ret;
// The exact command is irrelevant because it is not being sent
// to Redis.
req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
ret.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
return ret;
}
explicit test_item(request request_value)
: req{std::move(request_value)}
{
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter{resp});
elem_ptr->set_done_callback([this]() {
done = true;
});
}
test_item(bool cmd_with_response = true)
: test_item(make_request(cmd_with_response))
{ }
};
void check_response(
const generic_response& actual,
boost::span<const node> expected,
boost::source_location loc = BOOST_CURRENT_LOCATION)
{
if (!BOOST_TEST(actual.has_value())) {
std::cerr << "Response has error: " << actual.error().diagnostic << "\n"
<< "Called from " << loc << std::endl;
return;
}
if (!BOOST_TEST_ALL_EQ(actual->begin(), actual->end(), expected.begin(), expected.end())) {
std::cerr << "Called from " << loc << std::endl;
}
}
void test_request_needs_more()
{
// Setup
@@ -100,16 +128,10 @@ void test_request_needs_more()
// Parse the rest of it
ret = mpx.consume_next("$11\r\nhello world\r\n", ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST(item1.resp.has_value());
const node expected[] = {
{type::blob_string, 1u, 0u, "hello world"},
};
BOOST_TEST_ALL_EQ(
item1.resp->begin(),
item1.resp->end(),
std::begin(expected),
std::end(expected));
check_response(item1.resp, expected);
}
void test_several_requests()
@@ -239,14 +261,12 @@ void test_push()
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one"},
{type::simple_string, 1u, 1u, "two"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
check_response(resp, expected);
}
void test_push_needs_more()
@@ -272,14 +292,12 @@ void test_push_needs_more()
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one"},
{type::simple_string, 1u, 1u, "two"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
check_response(resp, expected);
}
// If a response is received and no request is waiting, it is interpreted
@@ -298,12 +316,10 @@ void test_push_heuristics_no_request()
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 14u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::simple_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
check_response(resp, expected);
}
// Same, but there's another request that hasn't been written yet.
@@ -326,12 +342,10 @@ void test_push_heuristics_request_waiting()
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 14u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::simple_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
check_response(resp, expected);
}
// If a response is received and the first request doesn't expect a response,
@@ -385,13 +399,12 @@ void test_mix_responses_pushes()
auto ret = mpx.consume_next(push1_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
BOOST_TEST(push_resp.has_value());
std::vector<node> expected{
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one"},
{type::simple_string, 1u, 1u, "two"},
};
BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end());
check_response(push_resp, expected);
BOOST_TEST_NOT(item1.done);
BOOST_TEST_NOT(item2.done);
@@ -400,11 +413,10 @@ void test_mix_responses_pushes()
ret = mpx.consume_next(response1_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, 18u);
BOOST_TEST(item1.resp.has_value());
expected = {
{type::blob_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(item1.resp->begin(), item1.resp->end(), expected.begin(), expected.end());
check_response(item1.resp, expected);
BOOST_TEST(item1.done);
BOOST_TEST_NOT(item2.done);
@@ -413,7 +425,6 @@ void test_mix_responses_pushes()
ret = mpx.consume_next(push2_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 19u);
BOOST_TEST(push_resp.has_value());
expected = {
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one" },
@@ -422,7 +433,7 @@ void test_mix_responses_pushes()
{type::simple_string, 1u, 1u, "other"},
{type::simple_string, 1u, 1u, "push" },
};
BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end());
check_response(push_resp, expected);
BOOST_TEST(item1.done);
BOOST_TEST_NOT(item2.done);
@@ -431,11 +442,10 @@ void test_mix_responses_pushes()
ret = mpx.consume_next(response2_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, 14u);
BOOST_TEST(item2.resp.has_value());
expected = {
{type::blob_string, 1u, 0u, "Response"},
};
BOOST_TEST_ALL_EQ(item2.resp->begin(), item2.resp->end(), expected.begin(), expected.end());
check_response(item2.resp, expected);
BOOST_TEST(item1.done);
BOOST_TEST(item2.done);
@@ -449,6 +459,244 @@ void test_mix_responses_pushes()
BOOST_TEST_EQ(usg.push_bytes_received, push1_buffer.size() + push2_buffer.size());
}
// Cancellation cases
// If the request is waiting, we just remove it
void test_cancel_waiting()
{
// Setup
multiplexer mpx;
auto item1 = std::make_unique<test_item>();
auto item2 = std::make_unique<test_item>();
mpx.add(item1->elem_ptr);
mpx.add(item2->elem_ptr);
// Cancel the first request
mpx.cancel(item1->elem_ptr);
item1.reset(); // Verify we don't reference this item anyhow
// We can progress the other request normally
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
error_code ec;
auto res = mpx.consume_next("$11\r\nHello world\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
check_response(item2->resp, expected);
}
// If the request is staged, we mark it as abandoned
void test_cancel_staged()
{
// Setup
multiplexer mpx;
auto item1 = std::make_unique<test_item>();
auto item2 = std::make_unique<test_item>();
mpx.add(item1->elem_ptr);
mpx.add(item2->elem_ptr);
// A write starts
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
// Cancel the first request
mpx.cancel(item1->elem_ptr);
item1.reset(); // Verify we don't reference this item anyhow
// The write gets confirmed
BOOST_TEST_EQ(mpx.commit_write(), 0u);
// The cancelled request's response arrives. It gets discarded
error_code ec;
auto res = mpx.consume_next("+Goodbye\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
// The 2nd request's response arrives. It gets parsed successfully
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
check_response(item2->resp, expected);
}
// If the request is staged but didn't expect a response, we remove it
void test_cancel_staged_command_without_response()
{
// Setup
multiplexer mpx;
auto item1 = std::make_unique<test_item>(false);
auto item2 = std::make_unique<test_item>();
mpx.add(item1->elem_ptr);
mpx.add(item2->elem_ptr);
// A write starts
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
// Cancel the first request
mpx.cancel(item1->elem_ptr);
item1.reset(); // Verify we don't reference this item anyhow
// The write gets confirmed
BOOST_TEST_EQ(mpx.commit_write(), 1u);
// The 2nd request's response arrives. It gets parsed successfully
error_code ec;
auto res = mpx.consume_next("$11\r\nHello world\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
check_response(item2->resp, expected);
}
// If the request is written, we mark it as abandoned
void test_cancel_written()
{
// Setup
multiplexer mpx;
auto item1 = std::make_unique<test_item>();
auto item2 = std::make_unique<test_item>();
mpx.add(item1->elem_ptr);
mpx.add(item2->elem_ptr);
// A write succeeds
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
// Cancel the first request
mpx.cancel(item1->elem_ptr);
item1.reset(); // Verify we don't reference this item anyhow
// The cancelled request's response arrives. It gets discarded
error_code ec;
auto res = mpx.consume_next("+Goodbye\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
// The 2nd request's response arrives. It gets parsed successfully
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
check_response(item2->resp, expected);
}
// Having a written request for which part of its response
// has been received doesn't cause trouble
void test_cancel_written_half_parsed_response()
{
// Setup
request req;
req.push("PING", "value1");
req.push("PING", "value2");
req.push("PING", "value3");
multiplexer mpx;
auto item1 = std::make_unique<test_item>(std::move(req));
auto item2 = std::make_unique<test_item>();
mpx.add(item1->elem_ptr);
mpx.add(item2->elem_ptr);
// A write succeeds
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
// Get the response for the 1st command in req1
error_code ec;
auto res = mpx.consume_next("+Goodbye\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item1->done);
BOOST_TEST_EQ(ec, error_code());
// Get a partial response for the 2nd command in req1
res = mpx.consume_next("*2\r\n$4\r\nsome\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::needs_more);
BOOST_TEST_NOT(item1->done);
BOOST_TEST_EQ(ec, error_code());
// Cancel the first request
mpx.cancel(item1->elem_ptr);
item1.reset(); // Verify we don't reference this item anyhow
// Get the rest of the response for the 2nd command in req1
res = mpx.consume_next("*2\r\n$4\r\nsome\r\n$4\r\ndata\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
BOOST_TEST_EQ(ec, error_code());
// Get the response for the 3rd command in req1
res = mpx.consume_next("+last\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
BOOST_TEST_EQ(ec, error_code());
// Get the response for the 2nd request
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
check_response(item2->resp, expected);
}
// If an abandoned request receives a NULL or an error, nothing happens
// (regression check)
void test_cancel_written_null_error()
{
// Setup
request req;
req.push("PING", "value1");
req.push("PING", "value2");
req.push("PING", "value3");
multiplexer mpx;
auto item1 = std::make_unique<test_item>(std::move(req));
auto item2 = std::make_unique<test_item>();
mpx.add(item1->elem_ptr);
mpx.add(item2->elem_ptr);
// A write succeeds
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
// Cancel the first request
mpx.cancel(item1->elem_ptr);
item1.reset(); // Verify we don't reference this item anyhow
// The cancelled request's response arrives. It contains NULLs and errors.
// We ignore them
error_code ec;
auto res = mpx.consume_next("-ERR wrong command\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_NOT(item2->done);
res = mpx.consume_next("!3\r\nBad\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_NOT(item2->done);
res = mpx.consume_next("_\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_NOT(item2->done);
// The 2nd request's response arrives. It gets parsed successfully
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
check_response(item2->resp, expected);
}
// Cancellation on connection lost
void test_cancel_on_connection_lost()
{
@@ -499,6 +747,57 @@ void test_cancel_on_connection_lost()
BOOST_TEST(item_waiting2.done);
}
// cancel_on_connection_lost cleans up any abandoned request,
// regardless of their configuration
void test_cancel_on_connection_lost_abandoned()
{
// Setup
multiplexer mpx;
auto item_written1 = std::make_unique<test_item>();
auto item_written2 = std::make_unique<test_item>();
auto item_staged1 = std::make_unique<test_item>();
auto item_staged2 = std::make_unique<test_item>();
// Different items have different configurations
// (note that these are all true by default)
item_written1->req.get_config().cancel_if_unresponded = false;
item_staged1->req.get_config().cancel_if_unresponded = false;
// Make each item reach the state it should be in
mpx.add(item_written1->elem_ptr);
mpx.add(item_written2->elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
mpx.add(item_staged1->elem_ptr);
mpx.add(item_staged2->elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
// Check that we got it right
BOOST_TEST(item_written1->elem_ptr->is_written());
BOOST_TEST(item_written2->elem_ptr->is_written());
BOOST_TEST(item_staged1->elem_ptr->is_staged());
BOOST_TEST(item_staged2->elem_ptr->is_staged());
// Cancel all of the requests
mpx.cancel(item_written1->elem_ptr);
mpx.cancel(item_written2->elem_ptr);
mpx.cancel(item_staged1->elem_ptr);
mpx.cancel(item_staged2->elem_ptr);
item_written1.reset();
item_written2.reset();
item_staged1.reset();
item_staged2.reset();
// Trigger a connection lost event
mpx.cancel_on_conn_lost();
// This should have removed all requests, regardless of their config.
// If we restore the connection and try a write, nothing gets written.
mpx.reset();
BOOST_TEST_EQ(mpx.prepare_write(), 0u);
}
// The test below fails. Uncomment when this is fixed:
// https://github.com/boostorg/redis/issues/307
// void test_cancel_on_connection_lost_half_parsed_response()
@@ -533,17 +832,12 @@ void test_cancel_on_connection_lost()
// BOOST_TEST_EQ(ec, error_code());
// // Check the response
// BOOST_TEST(item.resp.has_value());
// const node expected[] = {
// {type::array, 2u, 0u, "" },
// {type::simple_string, 1u, 1u, "hello"},
// {type::simple_string, 1u, 1u, "world"},
// };
// BOOST_TEST_ALL_EQ(
// item.resp->begin(),
// item.resp->end(),
// std::begin(expected),
// std::end(expected));
// check_response(item.resp, expected);
// }
// Resetting works
@@ -580,15 +874,10 @@ void test_reset()
ret = mpx.consume_next(response_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, response_buffer.size());
BOOST_TEST(item2.resp.has_value());
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(
item2.resp->begin(),
item2.resp->end(),
std::begin(expected),
std::end(expected));
check_response(item2.resp, expected);
BOOST_TEST(item2.done);
}
@@ -605,7 +894,14 @@ int main()
test_push_heuristics_request_without_response();
test_push_heuristics_request_waiting();
test_mix_responses_pushes();
test_cancel_waiting();
test_cancel_staged();
test_cancel_staged_command_without_response();
test_cancel_written();
test_cancel_written_half_parsed_response();
test_cancel_written_null_error();
test_cancel_on_connection_lost();
test_cancel_on_connection_lost_abandoned();
// test_cancel_on_connection_lost_half_parsed_response();
test_reset();