2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Makes async_receive2 not cancel on reconnection (#381)

async_receive2 is now only cancelled after calling connection::cancel() or using per-operation cancellation
Adds a restriction to only have one outstanding async_receive2 operation per connection
Adds error::already_running
Adds support for asio::cancel_after for async_receive2
Deprecates cancel(operation::receive)
Adds more documentation to async_receive2

close #331
This commit is contained in:
Anarthal (Rubén Pérez)
2026-01-18 16:14:53 +01:00
committed by GitHub
parent 002b616dd9
commit 89e44dc017
24 changed files with 835 additions and 87 deletions

View File

@@ -46,6 +46,7 @@ make_test(test_writer_fsm)
make_test(test_reader_fsm)
make_test(test_connect_fsm)
make_test(test_sentinel_resolve_fsm)
make_test(test_receive_fsm)
make_test(test_run_fsm)
make_test(test_compose_setup_request)
make_test(test_setup_adapter)

View File

@@ -62,6 +62,7 @@ local tests =
test_writer_fsm
test_reader_fsm
test_sentinel_resolve_fsm
test_receive_fsm
test_run_fsm
test_connect_fsm
test_compose_setup_request

View File

@@ -7,12 +7,14 @@
#include "common.hpp"
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <stdexcept>
#include <string_view>
namespace net = boost::asio;
using namespace std::chrono_literals;
struct run_callback {
std::shared_ptr<boost::redis::connection> conn;
@@ -55,6 +57,7 @@ boost::redis::config make_test_config()
{
boost::redis::config cfg;
cfg.addr.host = get_server_hostname();
cfg.reconnect_wait_interval = 50ms; // make tests involving reconnection faster
return cfg;
}

View File

@@ -93,6 +93,27 @@ void test_receive()
BOOST_TEST(receive_finished);
}
template <class Connection>
void test_receive2()
{
// Setup
asio::io_context ioc;
Connection conn{ioc};
bool receive_finished = false;
generic_response resp;
conn.set_receive_response(resp);
// Call the function with a very short timeout.
conn.async_receive2(asio::cancel_after(1ms, [&](error_code ec) {
BOOST_TEST_EQ(ec, asio::error::operation_aborted);
receive_finished = true;
}));
ioc.run_for(test_timeout);
BOOST_TEST(receive_finished);
}
} // namespace
int main()
@@ -106,5 +127,8 @@ int main()
test_receive<basic_connection<asio::io_context::executor_type>>();
test_receive<connection>();
test_receive2<basic_connection<asio::io_context::executor_type>>();
test_receive2<connection>();
return boost::report_errors();
}

View File

@@ -51,7 +51,6 @@ void test_reconnection()
// Make the test run faster
auto cfg = make_test_config();
cfg.health_check_interval = 500ms;
cfg.reconnect_wait_interval = 100ms;
bool run_finished = false, exec1_finished = false, exec2_finished = false;

View File

@@ -264,9 +264,7 @@ struct test_async_receive_cancelled_on_reconnection_impl {
start_subscribe1();
auto cfg = make_test_config();
cfg.reconnect_wait_interval = 50ms; // make the test run faster
conn.async_run(cfg, [&](error_code ec) {
conn.async_run(make_test_config(), [&](error_code ec) {
run_finished = true;
BOOST_TEST_EQ(ec, net::error::operation_aborted);
});

View File

@@ -5,11 +5,17 @@
*/
#include <boost/redis/connection.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/flat_tree.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/core/lightweight_test.hpp>
@@ -18,6 +24,7 @@
#include <cstddef>
#include <functional>
#include <iostream>
#include <iterator>
#include <set>
#include <string>
@@ -126,6 +133,237 @@ void test_async_receive2_push_available()
BOOST_TEST(run_finished);
}
// async_receive2 blocks only once if several messages are received in a batch
void test_async_receive2_batch()
{
// Setup
net::io_context ioc;
connection conn{ioc};
resp3::flat_tree resp;
conn.set_receive_response(resp);
// Cause two messages to be delivered. The PING ensures that
// the pushes have been read when exec completes
request req;
req.push("SUBSCRIBE", "test_async_receive2_batch");
req.push("SUBSCRIBE", "test_async_receive2_batch");
req.push("PING", "message");
bool receive_finished = false, run_finished = false;
// 1. Trigger pushes
// 2. Receive both of them
// 3. Check that receive2 has consumed them by calling it again
auto on_receive2 = [&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
receive_finished = true;
conn.cancel();
};
auto on_receive1 = [&](error_code ec) {
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(resp.get_total_msgs(), 2u);
conn.async_receive2(net::cancel_after(50ms, on_receive2));
};
conn.async_exec(req, ignore, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
conn.async_receive2(on_receive1);
});
conn.async_run(make_test_config(), [&](error_code ec) {
run_finished = true;
BOOST_TEST_EQ(ec, net::error::operation_aborted);
});
ioc.run_for(test_timeout);
BOOST_TEST(receive_finished);
BOOST_TEST(run_finished);
}
// async_receive2 can be called several times in a row
void test_async_receive2_subsequent_calls()
{
struct impl {
net::io_context ioc{};
connection conn{ioc};
resp3::flat_tree resp{};
request req{};
bool receive_finished = false, run_finished = false;
// Send a SUBSCRIBE, which will trigger a push
void start_subscribe1()
{
conn.async_exec(req, ignore, [this](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
start_receive1();
});
}
// Receive the push
void start_receive1()
{
conn.async_receive2([this](error_code ec) {
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(resp.get_total_msgs(), 1u);
resp.clear();
start_subscribe2();
});
}
// Send another SUBSCRIBE, which will trigger another push
void start_subscribe2()
{
conn.async_exec(req, ignore, [this](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
start_receive2();
});
}
// End
void start_receive2()
{
conn.async_receive2([this](error_code ec) {
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(resp.get_total_msgs(), 1u);
receive_finished = true;
conn.cancel();
});
}
void run()
{
// Setup
conn.set_receive_response(resp);
req.push("SUBSCRIBE", "test_async_receive2_subsequent_calls");
start_subscribe1();
conn.async_run(make_test_config(), [&](error_code ec) {
run_finished = true;
BOOST_TEST_EQ(ec, net::error::operation_aborted);
});
ioc.run_for(test_timeout);
BOOST_TEST(receive_finished);
BOOST_TEST(run_finished);
}
};
impl{}.run();
}
// async_receive2 can be cancelled using per-operation cancellation,
// and supports all cancellation types
void test_async_receive2_per_operation_cancellation(
std::string_view name,
net::cancellation_type_t type)
{
// Setup
net::io_context ioc;
connection conn{ioc};
net::cancellation_signal sig;
bool receive_finished = false;
conn.async_receive2(net::bind_cancellation_slot(sig.slot(), [&](error_code ec) {
if (!BOOST_TEST_EQ(ec, net::error::operation_aborted))
std::cerr << "With cancellation type " << name << std::endl;
receive_finished = true;
}));
sig.emit(type);
ioc.run_for(test_timeout);
if (!BOOST_TEST(receive_finished))
std::cerr << "With cancellation type " << name << std::endl;
}
// connection::cancel() cancels async_receive2
void test_async_receive2_connection_cancel()
{
// Setup
net::io_context ioc;
connection conn{ioc};
net::cancellation_signal sig;
bool receive_finished = false;
conn.async_receive2([&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
receive_finished = true;
});
conn.cancel();
ioc.run_for(test_timeout);
BOOST_TEST(receive_finished);
}
// Reconnection doesn't cancel async_receive2
void test_async_receive2_reconnection()
{
// Setup
net::io_context ioc;
connection conn{ioc};
resp3::flat_tree resp;
conn.set_receive_response(resp);
// Causes the reconnection
request req_quit;
req_quit.push("QUIT");
// When this completes, the reconnection has happened
request req_ping;
req_ping.get_config().cancel_if_unresponded = false;
req_ping.push("PING", "test_async_receive2_connection");
// Generates a push
request req_subscribe;
req_subscribe.push("SUBSCRIBE", "test_async_receive2_connection");
bool exec_finished = false, receive_finished = false, run_finished = false;
// Launch a receive operation, and in parallel
// 1. Trigger a reconnection
// 2. Wait for the reconnection and check that receive hasn't been cancelled
// 3. Trigger a push to make receive complete
auto on_subscribe = [&](error_code ec, std::size_t) {
// Will finish before receive2 because the command doesn't have a response
BOOST_TEST_EQ(ec, error_code());
exec_finished = true;
};
auto on_ping = [&](error_code ec, std::size_t) {
// Reconnection has already happened here
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_NOT(receive_finished);
conn.async_exec(req_subscribe, ignore, on_subscribe);
};
conn.async_exec(req_quit, ignore, [&](error_code, std::size_t) {
conn.async_exec(req_ping, ignore, on_ping);
});
conn.async_receive2([&](error_code ec) {
BOOST_TEST_EQ(ec, error_code());
receive_finished = true;
conn.cancel();
});
conn.async_run(make_test_config(), [&](error_code ec) {
run_finished = true;
BOOST_TEST_EQ(ec, net::error::operation_aborted);
});
ioc.run_for(test_timeout);
BOOST_TEST(exec_finished);
BOOST_TEST(receive_finished);
BOOST_TEST(run_finished);
}
// A push may be interleaved between regular responses.
// It is handed to the receive adapter (filtered out).
void test_exec_push_interleaved()
@@ -194,12 +432,12 @@ void test_push_adapter_error()
req.push("SUBSCRIBE", "channel");
req.push("PING");
bool push_received = false, exec_finished = false, run_finished = false;
bool receive_finished = false, exec_finished = false, run_finished = false;
// async_receive2 is cancelled every reconnection cycle
// We cancel receive when run exits
conn.async_receive2([&](error_code ec) {
BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled);
push_received = true;
BOOST_TEST_EQ(ec, net::error::operation_aborted);
receive_finished = true;
});
// The request is cancelled because the PING response isn't processed
@@ -211,13 +449,14 @@ void test_push_adapter_error()
auto cfg = make_test_config();
cfg.reconnect_wait_interval = 0s; // so we can validate the generated error
conn.async_run(cfg, [&run_finished](error_code ec) {
conn.async_run(cfg, [&](error_code ec) {
BOOST_TEST_EQ(ec, error::incompatible_size);
run_finished = true;
conn.cancel();
});
ioc.run_for(test_timeout);
BOOST_TEST(push_received);
BOOST_TEST(receive_finished);
BOOST_TEST(exec_finished);
BOOST_TEST(run_finished);
}
@@ -244,7 +483,7 @@ void test_push_adapter_error_reconnection()
// async_receive2 is cancelled every reconnection cycle
conn.async_receive2([&](error_code ec) {
BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled);
BOOST_TEST_EQ(ec, net::error::operation_aborted);
push_received = true;
});
@@ -262,9 +501,7 @@ void test_push_adapter_error_reconnection()
conn.async_exec(req2, resp, on_exec2);
});
auto cfg = make_test_config();
cfg.reconnect_wait_interval = 50ms; // make the test run faster
conn.async_run(cfg, [&run_finished](error_code ec) {
conn.async_run(make_test_config(), [&run_finished](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
run_finished = true;
});
@@ -276,8 +513,8 @@ void test_push_adapter_error_reconnection()
BOOST_TEST(run_finished);
}
// After an async_receive2 operation finishes, another one can be issued
void test_consecutive_receives()
// Tests the usual push consumer pattern that we recommend in the examples
void test_push_consumer()
{
net::io_context ioc;
connection conn{ioc};
@@ -287,7 +524,7 @@ void test_consecutive_receives()
std::function<void()> launch_push_consumer = [&]() {
conn.async_receive2([&](error_code ec) {
if (ec) {
BOOST_TEST_EQ(ec, net::experimental::error::channel_cancelled);
BOOST_TEST_EQ(ec, net::error::operation_aborted);
push_consumer_finished = true;
resp.clear();
return;
@@ -598,10 +835,17 @@ int main()
{
test_async_receive2_waiting_for_push();
test_async_receive2_push_available();
test_async_receive2_batch();
test_async_receive2_subsequent_calls();
test_async_receive2_per_operation_cancellation("terminal", net::cancellation_type_t::terminal);
test_async_receive2_per_operation_cancellation("partial", net::cancellation_type_t::partial);
test_async_receive2_per_operation_cancellation("total", net::cancellation_type_t::total);
test_async_receive2_connection_cancel();
test_async_receive2_reconnection();
test_exec_push_interleaved();
test_push_adapter_error();
test_push_adapter_error_reconnection();
test_consecutive_receives();
test_push_consumer();
test_unsubscribe();
test_pubsub_state_restoration();

View File

@@ -47,9 +47,7 @@ net::awaitable<void> test_reconnect_impl()
regular_req.get_config().cancel_if_unresponded = false;
auto conn = std::make_shared<connection>(ex);
auto cfg = make_test_config();
cfg.reconnect_wait_interval = 100ms; // make the test run faster
run(conn, std::move(cfg));
run(conn, make_test_config());
for (int i = 0; i < 3; ++i) {
BOOST_TEST_CONTEXT("i=" << i)

View File

@@ -147,8 +147,6 @@ BOOST_AUTO_TEST_CASE(reconnection)
net::io_context ioc;
net::steady_timer timer{ioc};
connection conn{ioc};
auto cfg = make_tls_config();
cfg.reconnect_wait_interval = 10ms; // make the test run faster
request ping_request;
ping_request.push("PING", "some_value");
@@ -161,7 +159,7 @@ BOOST_AUTO_TEST_CASE(reconnection)
bool exec_finished = false, run_finished = false;
// Run the connection
conn.async_run(cfg, {}, [&](error_code ec) {
conn.async_run(make_test_config(), [&](error_code ec) {
run_finished = true;
BOOST_TEST(ec == net::error::operation_aborted);
});

246
test/test_receive_fsm.cpp Normal file
View File

@@ -0,0 +1,246 @@
//
// Copyright (c) 2026 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/receive_fsm.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/channel_error.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include <iostream>
#include <string_view>
namespace net = boost::asio;
using namespace boost::redis;
using net::cancellation_type_t;
using boost::system::error_code;
using net::cancellation_type_t;
using detail::receive_action;
using detail::receive_fsm;
using detail::connection_state;
namespace channel_errc = net::experimental::channel_errc;
using action_type = receive_action::action_type;
// Operators
static const char* to_string(action_type type)
{
switch (type) {
case action_type::setup_cancellation: return "setup_cancellation";
case action_type::wait: return "wait";
case action_type::drain_channel: return "drain_channel";
case action_type::immediate: return "immediate";
case action_type::done: return "done";
default: return "<unknown action::type>";
}
}
namespace boost::redis::detail {
std::ostream& operator<<(std::ostream& os, action_type type) { return os << to_string(type); }
bool operator==(const receive_action& lhs, const receive_action& rhs) noexcept
{
return lhs.type == rhs.type && lhs.ec == rhs.ec;
}
std::ostream& operator<<(std::ostream& os, const receive_action& act)
{
os << "action{ .type=" << act.type;
if (act.type == action_type::done)
os << ", ec=" << act.ec;
return os << " }";
}
} // namespace boost::redis::detail
namespace {
struct fixture {
connection_state st;
generic_response resp;
};
void test_success()
{
connection_state st;
receive_fsm fsm;
// Initiate
auto act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::setup_cancellation);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
// At this point, the operation is now running
BOOST_TEST(st.receive2_running);
// The wait finishes successfully (we were notified). Receive exits
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::drain_channel);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code());
// The operation is no longer running
BOOST_TEST_NOT(st.receive2_running);
}
// We might see spurious cancels during reconnection (v1 compatibility).
void test_cancelled_reconnection()
{
connection_state st;
receive_fsm fsm;
// Initiate
auto act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::setup_cancellation);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
// Reconnection happens
act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
BOOST_TEST(st.receive2_running); // still running
// Another reconnection
act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
BOOST_TEST(st.receive2_running); // still running
// The wait finishes successfully (we were notified). Receive exits
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::drain_channel);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code());
// The operation is no longer running
BOOST_TEST_NOT(st.receive2_running);
}
// We might get cancellations due to connection::cancel()
void test_cancelled_connection_cancel()
{
connection_state st;
receive_fsm fsm;
// Initiate
auto act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::setup_cancellation);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
// Simulate a connection::cancel()
st.receive2_cancelled = true;
act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(net::error::operation_aborted));
BOOST_TEST_NOT(st.receive2_running);
}
// Operations can still run after connection::cancel()
void test_after_connection_cancel()
{
connection_state st;
receive_fsm fsm;
st.receive2_cancelled = true;
// The operation initiates and runs normally
auto act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::setup_cancellation);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
BOOST_TEST(st.receive2_running);
// Reconnection behavior not affected
act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
BOOST_TEST(st.receive2_running); // still running
// Simulate a connection::cancel()
st.receive2_cancelled = true;
act = fsm.resume(st, channel_errc::channel_cancelled, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(net::error::operation_aborted));
BOOST_TEST_NOT(st.receive2_running);
}
// Per-operation cancellation is supported
void test_per_operation_cancellation(std::string_view name, cancellation_type_t type)
{
std::cerr << "Running cancellation case " << name << std::endl;
connection_state st;
receive_fsm fsm;
// The operation initiates and runs normally
auto act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::setup_cancellation);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
BOOST_TEST(st.receive2_running);
// Cancellation is received
act = fsm.resume(st, channel_errc::channel_cancelled, type);
BOOST_TEST_EQ(act, error_code(net::error::operation_aborted));
BOOST_TEST_NOT(st.receive2_running);
}
// Only a single instance of async_receive2 can be running at the same time
void test_error_already_running()
{
connection_state st;
receive_fsm fsm;
st.receive2_running = true;
// The operation fails immediately
auto act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::immediate);
BOOST_TEST(st.receive2_running); // not affected
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::already_running));
BOOST_TEST(st.receive2_running); // not affected
}
// If an unknown error was obtained during channel receive, we propagate it
void test_error_unknown()
{
connection_state st;
receive_fsm fsm;
// Initiate
auto act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::setup_cancellation);
act = fsm.resume(st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action_type::wait);
BOOST_TEST(st.receive2_running);
// We have an unknown error
act = fsm.resume(st, channel_errc::channel_closed, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(channel_errc::channel_closed));
BOOST_TEST_NOT(st.receive2_running);
}
} // namespace
int main()
{
test_success();
test_cancelled_reconnection();
test_cancelled_connection_cancel();
test_after_connection_cancel();
test_per_operation_cancellation("terminal", cancellation_type_t::terminal);
test_per_operation_cancellation("partial", cancellation_type_t::partial);
test_per_operation_cancellation("total", cancellation_type_t::total);
test_per_operation_cancellation("all", cancellation_type_t::all);
test_error_already_running();
test_error_unknown();
return boost::report_errors();
}

View File

@@ -78,7 +78,6 @@ void test_reconnection()
connection conn{ioc};
auto cfg = make_test_config();
cfg.unix_socket = unix_socket_path;
cfg.reconnect_wait_interval = 10ms; // make the test run faster
request ping_request;
ping_request.get_config().cancel_if_not_connected = false;