2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Moves the setup request execution to run_fsm (#333)

Adds unit tests to cover setup request execution in run_fsm
Entails no functional change
This commit is contained in:
Anarthal (Rubén Pérez)
2025-10-20 15:56:46 +02:00
committed by GitHub
parent 2b09ecbd78
commit 6be0d122fb
11 changed files with 156 additions and 127 deletions

View File

@@ -39,6 +39,7 @@
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/cancellation_condition.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/immediate.hpp>
@@ -286,48 +287,12 @@ public:
}
};
system::error_code translate_parallel_group_errors(
std::array<std::size_t, 3u> order,
system::error_code setup_ec,
system::error_code reader_ec,
system::error_code writer_ec);
template <class Executor>
class run_op {
private:
connection_impl<Executor>* conn_;
run_fsm fsm_{};
static system::error_code on_setup_finished(
connection_impl<Executor>& conn,
system::error_code ec)
{
ec = check_setup_response(ec, conn.st_.setup_resp);
conn.st_.logger.on_setup(ec, conn.st_.setup_resp);
return ec;
}
template <class CompletionToken>
auto send_setup(CompletionToken&& token)
{
// clang-format off
// Skip sending the setup request if it's empty
return asio::deferred_t::when(conn_->st_.cfg.setup.get_commands() != 0u)
.then(
conn_->async_exec(
conn_->st_.cfg.setup,
any_adapter(conn_->st_.setup_resp),
asio::deferred([&conn = *this->conn_](system::error_code ec, std::size_t) {
return asio::deferred.values(on_setup_finished(conn, ec));
})
)
)
.otherwise(asio::deferred.values(system::error_code()))
(std::forward<CompletionToken>(token))
;
// clang-format on
}
template <class CompletionToken>
auto reader(CompletionToken&& token)
{
@@ -355,12 +320,11 @@ public:
template <class Self>
void operator()(
Self& self,
std::array<std::size_t, 3> order,
system::error_code setup_ec,
std::array<std::size_t, 2u> order,
system::error_code reader_ec,
system::error_code writer_ec)
{
(*this)(self, translate_parallel_group_errors(order, setup_ec, reader_ec, writer_ec));
(*this)(self, order[0u] == 0u ? reader_ec : writer_ec);
}
template <class Self>
@@ -377,21 +341,14 @@ public:
conn_->stream_.async_connect(conn_->st_.cfg, conn_->st_.logger, std::move(self));
return;
case run_action_type::parallel_group:
// Note: Order is important here because the writer might
// trigger an async_write before the setup request is sent,
// causing other requests to be sent before the setup request,
// violating the setup request contract.
asio::experimental::make_parallel_group(
[this](auto token) {
return this->send_setup(token);
},
[this](auto token) {
return this->reader(token);
},
[this](auto token) {
return this->writer(token);
})
.async_wait(asio::experimental::wait_for_one_error(), std::move(self));
.async_wait(asio::experimental::wait_for_one(), std::move(self));
return;
case run_action_type::cancel_receive:
conn_->receive_channel_.cancel();

View File

@@ -8,7 +8,6 @@
#define BOOST_REDIS_CONNECTION_LOGGER_HPP
#include <boost/redis/logger.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/error_code.hpp>
@@ -38,7 +37,7 @@ public:
void on_ssl_handshake(system::error_code const& ec);
void on_write(std::size_t n);
void on_read(system::error_code const& ec, std::size_t n);
void on_setup(system::error_code const& ec, generic_response const& resp);
void on_setup(system::error_code const& ec, std::string_view diagnostic);
void log(logger::level lvl, std::string_view msg);
void log(logger::level lvl, std::string_view msg1, std::string_view msg2);
void log(logger::level lvl, std::string_view op, system::error_code const& ec);

View File

@@ -15,6 +15,8 @@
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <string>
namespace boost::redis::detail {
// Contains all the members in connection that don't depend on the Executor.
@@ -23,7 +25,7 @@ struct connection_state {
connection_logger logger;
config cfg{};
multiplexer mpx{};
generic_response setup_resp{};
std::string setup_diagnostic{};
request ping_req{};
};

View File

@@ -20,9 +20,6 @@ void compose_setup_request(config& cfg);
// Completely clears a response
void clear_response(generic_response& res);
// Checks that the response to the setup request was successful
system::error_code check_setup_response(system::error_code io_ec, const generic_response&);
} // namespace boost::redis::detail
#endif // BOOST_REDIS_RUNNER_HPP

View File

@@ -21,31 +21,6 @@ logger detail::make_stderr_logger(logger::level lvl, std::string prefix)
});
}
system::error_code detail::translate_parallel_group_errors(
std::array<std::size_t, 3u> order,
system::error_code setup_ec,
system::error_code reader_ec,
system::error_code writer_ec)
{
// The setup request is special: it might complete successfully,
// without causing the other tasks to exit.
// The other tasks will always complete with an error.
// If the setup task errored and was the first to exit, use its code
if (order[0] == 0u && setup_ec) {
return setup_ec;
}
// Use the code corresponding to the task that finished first,
// excluding the setup task
std::size_t task_number = order[0] == 0u ? order[1] : order[0];
switch (task_number) {
case 1u: return reader_ec;
case 2u: return writer_ec;
default: BOOST_ASSERT(false); return system::error_code();
}
}
connection::connection(executor_type ex, asio::ssl::context ctx, logger lgr)
: impl_{std::move(ex), std::move(ctx), std::move(lgr)}
{ }

View File

@@ -142,7 +142,7 @@ void connection_logger::on_read(system::error_code const& ec, std::size_t bytes_
logger_.fn(logger::level::debug, msg_);
}
void connection_logger::on_setup(system::error_code const& ec, generic_response const& resp)
void connection_logger::on_setup(system::error_code const& ec, std::string_view diagnostic)
{
if (logger_.lvl < logger::level::info)
return;
@@ -150,9 +150,9 @@ void connection_logger::on_setup(system::error_code const& ec, generic_response
msg_ = "Setup request execution: ";
if (ec) {
format_error_code(ec, msg_);
if (resp.has_error()) {
if (!diagnostic.empty()) {
msg_ += " (";
msg_ += resp.error().diagnostic;
msg_ += diagnostic;
msg_ += ')';
}
} else {

View File

@@ -45,7 +45,8 @@ struct error_category_impl : system::error_category {
return "Can't receive server push synchronously without blocking.";
case error::incompatible_node_depth: return "Incompatible node depth.";
case error::resp3_hello:
return "The setup request sent during connection establishment failed.";
return "The server response to the setup request sent during connection establishment "
"contains an error.";
case error::unix_sockets_unsupported:
return "The configuration specified a UNIX socket address, but UNIX sockets are not "
"supported by the system.";

View File

@@ -6,8 +6,11 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/run_fsm.hpp>
#include <boost/redis/detail/setup_request_utils.hpp>
#include <boost/redis/impl/is_terminal_cancel.hpp>
@@ -37,6 +40,36 @@ inline void compose_ping_request(const config& cfg, request& to)
to.push("PING", cfg.health_check_id);
}
inline void process_setup_node(
connection_state& st,
resp3::basic_node<std::string_view> const& nd,
system::error_code& ec)
{
switch (nd.data_type) {
case resp3::type::simple_error:
case resp3::type::blob_error:
case resp3::type::null:
ec = redis::error::resp3_hello;
st.setup_diagnostic = nd.value;
break;
default:;
}
}
inline any_adapter make_setup_adapter(connection_state& st)
{
return any_adapter{
[&st](any_adapter::parse_event evt, resp3::node_view const& nd, system::error_code& ec) {
if (evt == any_adapter::parse_event::node)
process_setup_node(st, nd, ec);
}};
}
inline void on_setup_done(const multiplexer::elem& elm, connection_state& st)
{
st.logger.on_setup(elm.get_error(), st.setup_diagnostic);
}
run_action run_fsm::resume(
connection_state& st,
system::error_code ec,
@@ -74,7 +107,16 @@ run_action run_fsm::resume(
if (!ec) {
// Initialization
st.mpx.reset();
clear_response(st.setup_resp);
st.setup_diagnostic.clear();
// Add the setup request to the multiplexer
if (st.cfg.setup.get_commands() != 0u) {
auto elm = make_elem(st.cfg.setup, make_setup_adapter(st));
elm->set_done_callback([&elem_ref = *elm, &st] {
on_setup_done(elem_ref, st);
});
st.mpx.add(elm);
}
// Run the tasks
BOOST_REDIS_YIELD(resume_point_, 3, run_action_type::parallel_group)

View File

@@ -54,15 +54,4 @@ void clear_response(generic_response& res)
res.emplace();
}
system::error_code check_setup_response(system::error_code io_ec, const generic_response& resp)
{
if (io_ec)
return io_ec;
if (resp.has_error())
return error::resp3_hello;
return system::error_code();
}
} // namespace boost::redis::detail

View File

@@ -8,6 +8,7 @@
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/run_fsm.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
@@ -76,14 +77,22 @@ struct fixture : detail::log_fixture {
detail::connection_state st;
run_fsm fsm;
fixture(config&& cfg = {})
static config default_config()
{
config res;
res.use_setup = true;
res.setup.clear();
return res;
}
fixture(config&& cfg = default_config())
: st{make_logger(), std::move(cfg)}
{ }
};
config config_no_reconnect()
{
config res;
auto res = fixture::default_config();
res.reconnect_wait_interval = 0s;
return res;
}
@@ -446,6 +455,91 @@ void test_setup_ping_requests()
BOOST_TEST_EQ(fix.st.cfg.setup.payload(), expected_setup);
}
// We correctly send and log the setup request
void test_setup_request_success()
{
// Setup
fixture fix;
fix.st.cfg.setup.clear();
fix.st.cfg.setup.push("HELLO", 3);
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// At this point, the setup request should be already queued. Simulate the writer
BOOST_TEST_EQ(fix.st.mpx.prepare_write(), 1u);
BOOST_TEST(fix.st.mpx.commit_write(fix.st.mpx.get_write_buffer().size()));
// Simulate a successful read
read(fix.st.mpx, "+OK\r\n");
error_code ec;
auto res = fix.st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST(res.first == detail::consume_result::got_response);
// Check log
fix.check_log({
{logger::level::info, "Setup request execution: success"}
});
}
// We don't send empty setup requests
void test_setup_request_empty()
{
// Setup
fixture fix;
fix.st.cfg.setup.clear();
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// Nothing was added to the multiplexer
BOOST_TEST_EQ(fix.st.mpx.prepare_write(), 0u);
// Check log
fix.check_log({});
}
// A server error would cause the reader to exit
void test_setup_request_server_error()
{
// Setup
fixture fix;
fix.st.setup_diagnostic = "leftover"; // simulate a leftover from previous runs
fix.st.cfg.setup.clear();
fix.st.cfg.setup.push("HELLO", 3);
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// At this point, the setup request should be already queued. Simulate the writer
BOOST_TEST_EQ(fix.st.mpx.prepare_write(), 1u);
BOOST_TEST(fix.st.mpx.commit_write(fix.st.mpx.get_write_buffer().size()));
// Simulate a successful read
read(fix.st.mpx, "-ERR: wrong command\r\n");
error_code ec;
auto res = fix.st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error::resp3_hello);
BOOST_TEST(res.first == detail::consume_result::got_response);
// Check log
fix.check_log({
{logger::level::info,
"Setup request execution: The server response to the setup request sent during connection "
"establishment contains an error. [boost.redis:23] (ERR: wrong command)"}
});
}
} // namespace
int main()
@@ -471,5 +565,9 @@ int main()
test_several_reconnections();
test_setup_ping_requests();
test_setup_request_success();
test_setup_request_empty();
test_setup_request_server_error();
return boost::report_errors();
}

View File

@@ -22,7 +22,6 @@ namespace asio = boost::asio;
namespace redis = boost::redis;
using redis::detail::compose_setup_request;
using redis::detail::clear_response;
using redis::detail::check_setup_response;
using boost::system::error_code;
namespace {
@@ -174,32 +173,6 @@ void test_clear_response_error()
BOOST_TEST_EQ(resp.value().size(), 0u);
}
// check response
void test_check_response_success()
{
redis::generic_response resp;
resp->push_back({});
auto ec = check_setup_response(error_code(), resp);
BOOST_TEST_EQ(ec, error_code());
}
void test_check_response_io_error()
{
redis::generic_response resp;
auto ec = check_setup_response(asio::error::already_open, resp);
BOOST_TEST_EQ(ec, asio::error::already_open);
}
void test_check_response_server_error()
{
redis::generic_response resp{
boost::system::in_place_error,
redis::adapter::error{redis::resp3::type::simple_error, "wrong password"}
};
auto ec = check_setup_response(error_code(), resp);
BOOST_TEST_EQ(ec, redis::error::resp3_hello);
}
} // namespace
int main()
@@ -217,9 +190,5 @@ int main()
test_clear_response_nonempty();
test_clear_response_error();
test_check_response_success();
test_check_response_io_error();
test_check_response_server_error();
return boost::report_errors();
}