2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Fixes a race condition when cancelling requests on connection lost (#309)

Changes how cancel_on_conn_lost is used to ensure it is called only once and after the reader and writer tasks have exited.
This fixes a problem in test_conn_reconnect
Adds a test for multiplexer::reset()
Adds stronger invariants to the multiplexer functions to be called by the reader and writer
Removes test_issue_181, since the same functionality is being covered by unit tests already
Removes basic_connection::run_is_canceled
This commit is contained in:
Anarthal (Rubén Pérez)
2025-09-22 13:04:28 +02:00
committed by GitHub
parent 8da18379ba
commit 203e9298ed
7 changed files with 90 additions and 112 deletions

View File

@@ -174,7 +174,6 @@ struct connection_impl {
stream_.close();
writer_timer_.cancel();
receive_channel_.cancel();
mpx_.cancel_on_conn_lost();
}
bool is_open() const noexcept { return stream_.is_open(); }
@@ -462,6 +461,10 @@ public:
// The parallel group result will be translated into a single error
// code by the specialized operator() overload
// We've lost connection or otherwise been cancelled.
// Remove from the multiplexer the required requests.
conn_->mpx_.cancel_on_conn_lost();
// The receive operation must be cancelled because channel
// subscription does not survive a reconnection but requires
// re-subscription.
@@ -860,8 +863,6 @@ public:
*/
void cancel(operation op = operation::all) { impl_->cancel(op); }
auto run_is_canceled() const noexcept { return impl_->mpx_.get_cancel_run_state(); }
/// Returns true if the connection will try to reconnect if an error is encountered.
bool will_reconnect() const noexcept { return impl_->will_reconnect(); }

View File

@@ -116,20 +116,29 @@ public:
std::size_t read_size_;
};
auto remove(std::shared_ptr<elem> const& ptr) -> bool;
// To be called before a write operation. Coalesces all available requests
// into a single buffer. Returns the number of coalesced requests.
// Must be called before cancel_on_conn_lost() because it might change
// request status.
[[nodiscard]]
auto prepare_write() -> std::size_t;
// To be called after a successful write operation.
// Returns the number of requests that have been released because
// they don't have a response e.g. SUBSCRIBE.
// Must be called before cancel_on_conn_lost() because it might change
// request status.
auto commit_write() -> std::size_t;
// To be called after a successful read operation.
// Must be called before cancel_on_conn_lost() because it might change
// request status.
[[nodiscard]]
auto consume_next(std::string_view data, system::error_code& ec)
-> std::pair<consume_result, std::size_t>;
auto add(std::shared_ptr<elem> const& ptr) -> void;
auto remove(std::shared_ptr<elem> const& ptr) -> bool;
auto reset() -> void;
[[nodiscard]]
@@ -138,17 +147,20 @@ public:
return parser_;
}
//[[nodiscard]]
auto cancel_waiting() -> std::size_t;
//[[nodiscard]]
auto cancel_on_conn_lost() -> std::size_t;
[[nodiscard]]
auto get_cancel_run_state() const noexcept -> bool
{
return cancel_run_called_;
}
// To be called exactly once to clean up state after a connection becomes unhealthy.
// Requests are canceled or returned to the waiting state to be re-sent to the server,
// depending on their configuration. After this function is called, prepare_write,
// commit_write and consume_next must not be called until a reset() happens.
// Otherwise, race conditions like the following might happen
// (see https://github.com/boostorg/redis/pull/309 and https://github.com/boostorg/redis/issues/181):
//
// - This function runs and cancels a request, then consume_next runs. It tries to access
// a request and adapter that might have been destroyed.
// - This function runs and returns a request to waiting, then prepare_write runs.
// It incorrectly sets the request state to staged, causing de synchronization between requests and responses.
void cancel_on_conn_lost();
[[nodiscard]]
auto get_write_buffer() noexcept -> std::string_view

View File

@@ -8,6 +8,7 @@
#include <boost/redis/request.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <memory>
@@ -49,6 +50,8 @@ bool multiplexer::remove(std::shared_ptr<elem> const& ptr)
std::size_t multiplexer::commit_write()
{
BOOST_ASSERT(!cancel_run_called_);
// We have to clear the payload right after writing it to use it
// as a flag that informs there is no ongoing write.
write_buffer_.clear();
@@ -130,6 +133,8 @@ std::pair<consume_result, std::size_t> multiplexer::consume_next(
std::string_view data,
system::error_code& ec)
{
BOOST_ASSERT(!cancel_run_called_);
auto const ret = consume_next_impl(data, ec);
auto const consumed = parser_.get_consumed();
if (ec) {
@@ -155,6 +160,8 @@ void multiplexer::reset()
std::size_t multiplexer::prepare_write()
{
BOOST_ASSERT(!cancel_run_called_);
// Coalesces the requests and marks them staged. After a
// successful write staged requests will be marked as written.
auto const point = std::partition_point(
@@ -196,13 +203,12 @@ std::size_t multiplexer::cancel_waiting()
return ret;
}
auto multiplexer::cancel_on_conn_lost() -> std::size_t
void multiplexer::cancel_on_conn_lost()
{
// Protects the code below from being called more than
// once, see https://github.com/boostorg/redis/issues/181
if (std::exchange(cancel_run_called_, true)) {
return 0;
}
// Should only be called once per reconnection.
// See https://github.com/boostorg/redis/issues/181
BOOST_ASSERT(!cancel_run_called_);
cancel_run_called_ = true;
// Must return false if the request should be removed.
auto cond = [](auto const& ptr) {
@@ -217,8 +223,6 @@ auto multiplexer::cancel_on_conn_lost() -> std::size_t
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
auto const ret = std::distance(point, std::end(reqs_));
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->notify_error({asio::error::operation_aborted});
});
@@ -228,8 +232,6 @@ auto multiplexer::cancel_on_conn_lost() -> std::size_t
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return ptr->mark_waiting();
});
return ret;
}
void multiplexer::commit_usage(bool is_push, std::size_t size)

View File

@@ -59,7 +59,6 @@ make_test(test_conn_echo_stress)
make_test(test_conn_move)
make_test(test_conn_setup)
make_test(test_issue_50)
make_test(test_issue_181)
make_test(test_conversions)
make_test(test_conn_tls)
make_test(test_unix_sockets)

View File

@@ -42,7 +42,7 @@ net::awaitable<void> test_reconnect_impl()
// cancel_on_connection_lost is required because async_run might detect the failure
// after the 2nd async_exec is issued
request regular_req;
regular_req.push("GET", "mykey");
regular_req.push("PING", "SomeValue");
regular_req.get_config().cancel_on_connection_lost = false;
auto conn = std::make_shared<connection>(ex);
@@ -54,16 +54,14 @@ net::awaitable<void> test_reconnect_impl()
BOOST_TEST_CONTEXT("i=" << i)
{
// Issue a quit request, which will cause the server to close the connection.
// This request will fail
// This request will succeed, since this happens before the connection is lost.
error_code ec;
co_await conn->async_exec(quit_req, ignore, net::redirect_error(ec));
BOOST_TEST(ec == error_code());
// This should trigger reconnection, which will now succeed.
// We should be able to execute requests successfully now.
// TODO: this is currently unreliable - find our why and fix
// Reconnection will happen, and this request will succeed, too.
co_await conn->async_exec(regular_req, ignore, net::redirect_error(ec));
// BOOST_TEST(ec == error_code());
BOOST_TEST(ec == error_code());
}
}

View File

@@ -1,76 +0,0 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/error.hpp>
#include <boost/system/error_code.hpp>
#define BOOST_TEST_MODULE issue_181
#include <boost/test/included/unit_test.hpp>
#include "common.hpp"
#include <chrono>
#include <iostream>
namespace net = boost::asio;
using boost::redis::request;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::operation;
using boost::redis::connection;
using boost::system::error_code;
using namespace std::chrono_literals;
namespace {
BOOST_AUTO_TEST_CASE(issue_181)
{
using basic_connection = boost::redis::basic_connection<net::any_io_executor>;
auto const level = boost::redis::logger::level::debug;
net::io_context ioc;
auto ctx = net::ssl::context{net::ssl::context::tlsv12_client};
basic_connection conn{ioc.get_executor(), std::move(ctx)};
net::steady_timer timer{ioc};
timer.expires_after(std::chrono::seconds{1});
bool run_finished = false;
auto run_cont = [&](error_code ec) {
std::cout << "async_run1: " << ec.message() << std::endl;
BOOST_TEST(ec == net::error::operation_aborted);
run_finished = true;
};
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds{0};
cfg.reconnect_wait_interval = std::chrono::seconds{0};
conn.async_run(cfg, boost::redis::logger{level}, run_cont);
BOOST_TEST(!conn.run_is_canceled());
// Uses a timer to wait some time until run has been called.
auto timer_cont = [&](error_code ec) {
std::cout << "timer_cont: " << ec.message() << std::endl;
BOOST_TEST(ec == error_code());
BOOST_TEST(!conn.run_is_canceled());
conn.cancel(operation::run);
BOOST_TEST(conn.run_is_canceled());
};
timer.async_wait(timer_cont);
ioc.run_for(test_timeout);
BOOST_TEST(run_finished);
}
} // namespace

View File

@@ -497,11 +497,6 @@ void test_cancel_on_connection_lost()
BOOST_TEST(!item_waiting1.done);
BOOST_TEST(item_waiting1.elem_ptr->is_waiting());
BOOST_TEST(item_waiting2.done);
// Triggering it again does nothing
mpx.cancel_on_conn_lost();
BOOST_TEST(!item_written1.done);
BOOST_TEST(item_written1.elem_ptr->is_waiting());
}
// The test below fails. Uncomment when this is fixed:
@@ -551,6 +546,52 @@ void test_cancel_on_connection_lost()
// std::end(expected));
// }
// Resetting works
void test_reset()
{
// Setup
multiplexer mpx;
generic_response push_resp;
mpx.set_receive_adapter(any_adapter{push_resp});
test_item item1, item2;
// Add a request
mpx.add(item1.elem_ptr);
// Start parsing a push
error_code ec;
auto ret = mpx.consume_next(">2\r", ec);
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
// Connection lost. The first request gets cancelled
mpx.cancel_on_conn_lost();
BOOST_TEST(item1.done);
// Reconnection happens
mpx.reset();
ec.clear();
// We're able to add write requests and read responses - all state was reset
mpx.add(item2.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
std::string_view response_buffer = "$11\r\nHello world\r\n";
ret = mpx.consume_next(response_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, response_buffer.size());
BOOST_TEST(item2.resp.has_value());
const node expected[] = {
{type::blob_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(
item2.resp->begin(),
item2.resp->end(),
std::begin(expected),
std::end(expected));
BOOST_TEST(item2.done);
}
} // namespace
int main()
@@ -566,6 +607,7 @@ int main()
test_mix_responses_pushes();
test_cancel_on_connection_lost();
// test_cancel_on_connection_lost_half_parsed_response();
test_reset();
return boost::report_errors();
}