2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Implements async_run as a FSM and adds tests (#330)

* Implements async_run as a FSM and adds tests
* Places all sans-io variables in connection_impl in a connection_state struct

Entails no functional change.
This commit is contained in:
Anarthal (Rubén Pérez)
2025-10-13 22:19:39 +02:00
committed by GitHub
parent 28ed27ce72
commit f683e368dd
12 changed files with 794 additions and 169 deletions

View File

@@ -11,12 +11,14 @@
#include <boost/redis/adapter/any_adapter.hpp> #include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/config.hpp> #include <boost/redis/config.hpp>
#include <boost/redis/detail/connection_logger.hpp> #include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/exec_fsm.hpp> #include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/detail/helper.hpp> #include <boost/redis/detail/helper.hpp>
#include <boost/redis/detail/multiplexer.hpp> #include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/reader_fsm.hpp> #include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/detail/redis_stream.hpp> #include <boost/redis/detail/redis_stream.hpp>
#include <boost/redis/detail/resp3_type_to_error.hpp> #include <boost/redis/detail/resp3_type_to_error.hpp>
#include <boost/redis/detail/run_fsm.hpp>
#include <boost/redis/detail/setup_request_utils.hpp> #include <boost/redis/detail/setup_request_utils.hpp>
#include <boost/redis/detail/writer_fsm.hpp> #include <boost/redis/detail/writer_fsm.hpp>
#include <boost/redis/error.hpp> #include <boost/redis/error.hpp>
@@ -87,13 +89,7 @@ struct connection_impl {
timer_type ping_timer_; // to wait between pings timer_type ping_timer_; // to wait between pings
receive_channel_type receive_channel_; receive_channel_type receive_channel_;
asio::cancellation_signal run_signal_; asio::cancellation_signal run_signal_;
connection_state st_;
config cfg_;
multiplexer mpx_;
connection_logger logger_;
generic_response setup_resp_;
request ping_req_;
generic_response ping_resp_;
using executor_type = Executor; using executor_type = Executor;
@@ -140,7 +136,7 @@ struct connection_impl {
, reconnect_timer_{ex} , reconnect_timer_{ex}
, ping_timer_{ex} , ping_timer_{ex}
, receive_channel_{ex, 256} , receive_channel_{ex, 256}
, logger_{std::move(lgr)} , st_{std::move(lgr)}
{ {
set_receive_adapter(any_adapter{ignore}); set_receive_adapter(any_adapter{ignore});
writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
@@ -149,10 +145,10 @@ struct connection_impl {
void cancel(operation op) void cancel(operation op)
{ {
switch (op) { switch (op) {
case operation::exec: mpx_.cancel_waiting(); break; case operation::exec: st_.mpx.cancel_waiting(); break;
case operation::receive: receive_channel_.cancel(); break; case operation::receive: receive_channel_.cancel(); break;
case operation::reconnection: case operation::reconnection:
cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero();
break; break;
case operation::run: case operation::run:
case operation::resolve: case operation::resolve:
@@ -160,10 +156,10 @@ struct connection_impl {
case operation::ssl_handshake: case operation::ssl_handshake:
case operation::health_check: cancel_run(); break; case operation::health_check: cancel_run(); break;
case operation::all: case operation::all:
mpx_.cancel_waiting(); // exec st_.mpx.cancel_waiting(); // exec
receive_channel_.cancel(); // receive receive_channel_.cancel(); // receive
cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect st_.cfg.reconnect_wait_interval = std::chrono::seconds::zero(); // reconnect
cancel_run(); // run cancel_run(); // run
break; break;
default: /* ignore */; default: /* ignore */;
} }
@@ -188,7 +184,7 @@ struct connection_impl {
bool will_reconnect() const noexcept bool will_reconnect() const noexcept
{ {
return cfg_.reconnect_wait_interval != std::chrono::seconds::zero(); return st_.cfg.reconnect_wait_interval != std::chrono::seconds::zero();
} }
template <class CompletionToken> template <class CompletionToken>
@@ -202,12 +198,15 @@ struct connection_impl {
}); });
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>( return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
exec_op{this, notifier, exec_fsm(mpx_, std::move(info))}, exec_op{this, notifier, exec_fsm(st_.mpx, std::move(info))},
token, token,
writer_timer_); writer_timer_);
} }
void set_receive_adapter(any_adapter adapter) { mpx_.set_receive_adapter(std::move(adapter)); } void set_receive_adapter(any_adapter adapter)
{
st_.mpx.set_receive_adapter(std::move(adapter));
}
}; };
template <class Executor> template <class Executor>
@@ -217,7 +216,7 @@ struct writer_op {
explicit writer_op(connection_impl<Executor>& conn) noexcept explicit writer_op(connection_impl<Executor>& conn) noexcept
: conn_(&conn) : conn_(&conn)
, fsm_(conn.mpx_, conn.logger_) , fsm_(conn.st_.mpx, conn.st_.logger)
{ } { }
template <class Self> template <class Self>
@@ -230,7 +229,7 @@ struct writer_op {
case writer_action_type::write: case writer_action_type::write:
asio::async_write( asio::async_write(
conn_->stream_, conn_->stream_,
asio::buffer(conn_->mpx_.get_write_buffer()), asio::buffer(conn_->st_.mpx.get_write_buffer()),
std::move(self)); std::move(self));
return; return;
case writer_action_type::wait: conn_->writer_timer_.async_wait(std::move(self)); return; case writer_action_type::wait: conn_->writer_timer_.async_wait(std::move(self)); return;
@@ -246,7 +245,7 @@ struct reader_op {
public: public:
reader_op(connection_impl<Executor>& conn) noexcept reader_op(connection_impl<Executor>& conn) noexcept
: conn_{&conn} : conn_{&conn}
, fsm_{conn.mpx_} , fsm_{conn.st_.mpx}
{ } { }
template <class Self> template <class Self>
@@ -255,13 +254,13 @@ public:
for (;;) { for (;;) {
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
conn_->logger_.on_fsm_resume(act); conn_->st_.logger.on_fsm_resume(act);
switch (act.type_) { switch (act.type_) {
case reader_fsm::action::type::needs_more: case reader_fsm::action::type::needs_more:
case reader_fsm::action::type::read_some: case reader_fsm::action::type::read_some:
{ {
auto const buf = conn_->mpx_.get_prepared_read_buffer(); auto const buf = conn_->st_.mpx.get_prepared_read_buffer();
conn_->stream_.async_read_some(asio::buffer(buf), std::move(self)); conn_->stream_.async_read_some(asio::buffer(buf), std::move(self));
} }
return; return;
@@ -288,26 +287,26 @@ struct health_checker_op {
{ {
// Did we have a cancellation? We might not have an error code here // Did we have a cancellation? We might not have an error code here
if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) { if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) {
conn_->logger_.log(logger::level::info, "Health checker: cancelled"); conn_->st_.logger.log(logger::level::info, "Health checker: cancelled");
return asio::error::operation_aborted; return asio::error::operation_aborted;
} }
// operation_aborted and no cancel state means that asio::cancel_after timed out // operation_aborted and no cancel state means that asio::cancel_after timed out
if (io_ec == asio::error::operation_aborted) { if (io_ec == asio::error::operation_aborted) {
conn_->logger_.log(logger::level::info, "Health checker: ping timed out"); conn_->st_.logger.log(logger::level::info, "Health checker: ping timed out");
return error::pong_timeout; return error::pong_timeout;
} }
// Did we have other unknown error? // Did we have other unknown error?
if (io_ec) { if (io_ec) {
conn_->logger_.log(logger::level::info, "Health checker: ping error", io_ec); conn_->st_.logger.log(logger::level::info, "Health checker: ping error", io_ec);
return io_ec; return io_ec;
} }
// Did the server answer with an error? // Did the server answer with an error?
if (conn_->ping_resp_.has_error()) { if (conn_->st_.ping_resp.has_error()) {
auto error = conn_->ping_resp_.error(); auto error = conn_->st_.ping_resp.error();
conn_->logger_.log( conn_->st_.logger.log(
logger::level::info, logger::level::info,
"Health checker: server answered ping with an error", "Health checker: server answered ping with an error",
error.diagnostic); error.diagnostic);
@@ -328,8 +327,8 @@ public:
{ {
BOOST_ASIO_CORO_REENTER(coro_) BOOST_ASIO_CORO_REENTER(coro_)
{ {
if (conn_->cfg_.health_check_interval == std::chrono::seconds::zero()) { if (conn_->st_.cfg.health_check_interval == std::chrono::seconds::zero()) {
conn_->logger_.trace("ping_op (1): timeout disabled."); conn_->st_.logger.trace("ping_op (1): timeout disabled.");
// Wait until we're cancelled. This simplifies parallel group handling a lot // Wait until we're cancelled. This simplifies parallel group handling a lot
conn_->ping_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); conn_->ping_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
@@ -340,16 +339,16 @@ public:
for (;;) { for (;;) {
// Clean up any previous leftover // Clean up any previous leftover
clear_response(conn_->ping_resp_); clear_response(conn_->st_.ping_resp);
// Execute the request // Execute the request
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
{ {
auto* conn = conn_; // avoid use-after-move problems auto* conn = conn_; // avoid use-after-move problems
auto timeout = conn->cfg_.health_check_interval; auto timeout = conn->st_.cfg.health_check_interval;
conn->async_exec( conn->async_exec(
conn->ping_req_, conn->st_.ping_req,
any_adapter{conn->ping_resp_}, any_adapter{conn->st_.ping_resp},
asio::cancel_after(conn->ping_timer_, timeout, std::move(self))); asio::cancel_after(conn->ping_timer_, timeout, std::move(self)));
} }
@@ -361,13 +360,13 @@ public:
} }
// Wait before pinging again. // Wait before pinging again.
conn_->ping_timer_.expires_after(conn_->cfg_.health_check_interval); conn_->ping_timer_.expires_after(conn_->st_.cfg.health_check_interval);
BOOST_ASIO_CORO_YIELD BOOST_ASIO_CORO_YIELD
conn_->ping_timer_.async_wait(std::move(self)); conn_->ping_timer_.async_wait(std::move(self));
if (is_cancelled(self)) { if (is_cancelled(self)) {
conn_->logger_.trace("ping_op (2): cancelled"); conn_->st_.logger.trace("ping_op (2): cancelled");
self.complete(asio::error::operation_aborted); self.complete(asio::error::operation_aborted);
return; return;
} }
@@ -376,24 +375,6 @@ public:
} }
}; };
inline void compose_ping_request(const config& cfg, request& to)
{
to.clear();
to.push("PING", cfg.health_check_id);
}
inline system::error_code check_config(const config& cfg)
{
if (!cfg.unix_socket.empty()) {
#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
return error::unix_sockets_unsupported;
#endif
if (cfg.use_ssl)
return error::unix_sockets_ssl_unsupported;
}
return system::error_code{};
}
system::error_code translate_parallel_group_errors( system::error_code translate_parallel_group_errors(
std::array<std::size_t, 4u> order, std::array<std::size_t, 4u> order,
system::error_code setup_ec, system::error_code setup_ec,
@@ -404,16 +385,15 @@ system::error_code translate_parallel_group_errors(
template <class Executor> template <class Executor>
class run_op { class run_op {
private: private:
connection_impl<Executor>* conn_ = nullptr; connection_impl<Executor>* conn_;
asio::coroutine coro_{}; run_fsm fsm_{};
system::error_code stored_ec_;
static system::error_code on_setup_finished( static system::error_code on_setup_finished(
connection_impl<Executor>& conn, connection_impl<Executor>& conn,
system::error_code ec) system::error_code ec)
{ {
ec = check_setup_response(ec, conn.setup_resp_); ec = check_setup_response(ec, conn.st_.setup_resp);
conn.logger_.on_setup(ec, conn.setup_resp_); conn.st_.logger.on_setup(ec, conn.st_.setup_resp);
return ec; return ec;
} }
@@ -422,11 +402,11 @@ private:
{ {
// clang-format off // clang-format off
// Skip sending the setup request if it's empty // Skip sending the setup request if it's empty
return asio::deferred_t::when(conn_->cfg_.setup.get_commands() != 0u) return asio::deferred_t::when(conn_->st_.cfg.setup.get_commands() != 0u)
.then( .then(
conn_->async_exec( conn_->async_exec(
conn_->cfg_.setup, conn_->st_.cfg.setup,
any_adapter(conn_->setup_resp_), any_adapter(conn_->st_.setup_resp),
asio::deferred([&conn = *this->conn_](system::error_code ec, std::size_t) { asio::deferred([&conn = *this->conn_](system::error_code ec, std::size_t) {
return asio::deferred.values(on_setup_finished(conn, ec)); return asio::deferred.values(on_setup_finished(conn, ec));
}) })
@@ -488,102 +468,45 @@ public:
template <class Self> template <class Self>
void operator()(Self& self, system::error_code ec = {}) void operator()(Self& self, system::error_code ec = {})
{ {
BOOST_ASIO_CORO_REENTER(coro_) auto act = fsm_.resume(conn_->st_, ec, self.get_cancellation_state().cancelled());
{
// Check config switch (act.type) {
ec = check_config(conn_->cfg_); case run_action_type::done: self.complete(act.ec); return;
if (ec) { case run_action_type::immediate:
conn_->logger_.log(logger::level::err, "Invalid configuration", ec); asio::async_immediate(self.get_io_executor(), std::move(self));
stored_ec_ = ec;
BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self));
self.complete(stored_ec_);
return; return;
} case run_action_type::connect:
conn_->stream_.async_connect(conn_->st_.cfg, conn_->st_.logger, std::move(self));
// Compose the setup request. This only depends on the config, so it can be done just once return;
compose_setup_request(conn_->cfg_); case run_action_type::parallel_group:
// Note: Order is important here because the writer might
// Compose the PING request. Same as above // trigger an async_write before the setup request is sent,
compose_ping_request(conn_->cfg_, conn_->ping_req_); // causing other requests to be sent before the setup request,
// violating the setup request contract.
for (;;) { asio::experimental::make_parallel_group(
// Try to connect [this](auto token) {
BOOST_ASIO_CORO_YIELD return this->send_setup(token);
conn_->stream_.async_connect(conn_->cfg_, conn_->logger_, std::move(self)); },
[this](auto token) {
// Check for cancellations return this->health_checker(token);
if (is_cancelled(self)) { },
self.complete(asio::error::operation_aborted); [this](auto token) {
return; return this->reader(token);
} },
[this](auto token) {
// If we were successful, run all the connection tasks return this->writer(token);
if (!ec) { })
conn_->mpx_.reset(); .async_wait(asio::experimental::wait_for_one_error(), std::move(self));
clear_response(conn_->setup_resp_); return;
case run_action_type::cancel_receive:
// Note: Order is important here because the writer might conn_->receive_channel_.cancel();
// trigger an async_write before the setup request is sent, (*this)(self); // this action does not require suspending
// causing other requests to be sent before the setup request, return;
// violating the setup request contract. case run_action_type::wait_for_reconnection:
BOOST_ASIO_CORO_YIELD conn_->reconnect_timer_.expires_after(conn_->st_.cfg.reconnect_wait_interval);
asio::experimental::make_parallel_group(
[this](auto token) {
return this->send_setup(token);
},
[this](auto token) {
return this->health_checker(token);
},
[this](auto token) {
return this->reader(token);
},
[this](auto token) {
return this->writer(token);
})
.async_wait(asio::experimental::wait_for_one_error(), std::move(self));
// The parallel group result will be translated into a single error
// code by the specialized operator() overload
// We've lost connection or otherwise been cancelled.
// Remove from the multiplexer the required requests.
conn_->mpx_.cancel_on_conn_lost();
// The receive operation must be cancelled because channel
// subscription does not survive a reconnection but requires
// re-subscription.
conn_->receive_channel_.cancel();
}
// If we are not going to try again, we're done
if (!conn_->will_reconnect()) {
self.complete(ec);
return;
}
// Check for cancellations
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
// Wait for the reconnection interval
conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
BOOST_ASIO_CORO_YIELD
conn_->reconnect_timer_.async_wait(std::move(self)); conn_->reconnect_timer_.async_wait(std::move(self));
return;
// Check for cancellations default: BOOST_ASSERT(false);
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
// If we won't reconnect, exit
if (!conn_->will_reconnect()) {
self.complete(asio::error::operation_aborted);
return;
}
}
} }
} }
}; };
@@ -1076,7 +999,7 @@ public:
} }
/// Returns connection usage information. /// Returns connection usage information.
usage get_usage() const noexcept { return impl_->mpx_.get_usage(); } usage get_usage() const noexcept { return impl_->st_.mpx.get_usage(); }
private: private:
using clock_type = std::chrono::steady_clock; using clock_type = std::chrono::steady_clock;
@@ -1092,7 +1015,7 @@ private:
// Used by both this class and connection // Used by both this class and connection
void set_stderr_logger(logger::level lvl, const config& cfg) void set_stderr_logger(logger::level lvl, const config& cfg)
{ {
impl_->logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix)); impl_->st_.logger.reset(detail::make_stderr_logger(lvl, cfg.log_prefix));
} }
// Initiation for async_run. This is required because we need access // Initiation for async_run. This is required because we need access
@@ -1107,8 +1030,8 @@ private:
template <class Handler> template <class Handler>
void operator()(Handler&& handler, config const* cfg) void operator()(Handler&& handler, config const* cfg)
{ {
self->cfg_ = *cfg; self->st_.cfg = *cfg;
self->mpx_.set_config(*cfg); self->st_.mpx.set_config(*cfg);
// If the token's slot has cancellation enabled, it should just emit // If the token's slot has cancellation enabled, it should just emit
// the cancellation signal in our connection. This lets us unify the cancel() // the cancellation signal in our connection. This lets us unify the cancel()

View File

@@ -0,0 +1,33 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_REDIS_CONNECTION_STATE_HPP
#define BOOST_REDIS_CONNECTION_STATE_HPP
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
namespace boost::redis::detail {
// Contains all the members in connection that don't depend on the Executor.
// Makes implementing sans-io algorithms easier
struct connection_state {
connection_logger logger;
config cfg{};
multiplexer mpx{};
generic_response setup_resp{};
request ping_req{};
generic_response ping_resp{};
};
} // namespace boost::redis::detail
#endif // BOOST_REDIS_CONNECTOR_HPP

View File

@@ -0,0 +1,65 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_REDIS_RUN_FSM_HPP
#define BOOST_REDIS_RUN_FSM_HPP
#include <boost/redis/detail/connection_state.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/system/error_code.hpp>
// Sans-io algorithm for async_run, as a finite state machine
namespace boost::redis::detail {
// Forward decls
class connection_logger;
class multiplexer;
// What should we do next?
enum class run_action_type
{
done, // Call the final handler
immediate, // Call asio::async_immediate
connect, // Transport connection establishment
parallel_group, // Run the reader, writer and friends
cancel_receive, // Cancel the receiver channel
wait_for_reconnection, // Sleep for the reconnection period
};
struct run_action {
run_action_type type;
system::error_code ec;
run_action(run_action_type type) noexcept
: type{type}
{ }
run_action(system::error_code ec) noexcept
: type{run_action_type::done}
, ec{ec}
{ }
};
class run_fsm {
int resume_point_{0};
system::error_code stored_ec_;
public:
run_fsm() = default;
run_action resume(
connection_state& st,
system::error_code ec,
asio::cancellation_type_t cancel_state);
};
} // namespace boost::redis::detail
#endif // BOOST_REDIS_CONNECTOR_HPP

View File

@@ -0,0 +1,125 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/config.hpp>
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/run_fsm.hpp>
#include <boost/redis/detail/setup_request_utils.hpp>
#include <boost/redis/impl/is_terminal_cancel.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/local/basic_endpoint.hpp> // for BOOST_ASIO_HAS_LOCAL_SOCKETS
#include <boost/system/error_code.hpp>
namespace boost::redis::detail {
inline system::error_code check_config(const config& cfg)
{
if (!cfg.unix_socket.empty()) {
if (cfg.use_ssl)
return error::unix_sockets_ssl_unsupported;
#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
return error::unix_sockets_unsupported;
#endif
}
return system::error_code{};
}
inline void compose_ping_request(const config& cfg, request& to)
{
to.clear();
to.push("PING", cfg.health_check_id);
}
run_action run_fsm::resume(
connection_state& st,
system::error_code ec,
asio::cancellation_type_t cancel_state)
{
switch (resume_point_) {
BOOST_REDIS_CORO_INITIAL
// Check config
ec = check_config(st.cfg);
if (ec) {
st.logger.log(logger::level::err, "Invalid configuration", ec);
stored_ec_ = ec;
BOOST_REDIS_YIELD(resume_point_, 1, run_action_type::immediate)
return stored_ec_;
}
// Compose the setup request. This only depends on the config, so it can be done just once
compose_setup_request(st.cfg);
// Compose the PING request. Same as above
compose_ping_request(st.cfg, st.ping_req);
for (;;) {
// Try to connect
BOOST_REDIS_YIELD(resume_point_, 2, run_action_type::connect)
// Check for cancellations
if (is_terminal_cancel(cancel_state)) {
st.logger.trace("Run: cancelled (1)");
return system::error_code(asio::error::operation_aborted);
}
// If we were successful, run all the connection tasks
if (!ec) {
// Initialization
st.mpx.reset();
clear_response(st.setup_resp);
// Run the tasks
BOOST_REDIS_YIELD(resume_point_, 3, run_action_type::parallel_group)
// Store any error yielded by the tasks for later
stored_ec_ = ec;
// We've lost connection or otherwise been cancelled.
// Remove from the multiplexer the required requests.
st.mpx.cancel_on_conn_lost();
// The receive operation must be cancelled because channel
// subscription does not survive a reconnection but requires
// re-subscription.
BOOST_REDIS_YIELD(resume_point_, 4, run_action_type::cancel_receive)
// Restore the error
ec = stored_ec_;
}
// Check for cancellations
if (is_terminal_cancel(cancel_state)) {
st.logger.trace("Run: cancelled (2)");
return system::error_code(asio::error::operation_aborted);
}
// If we are not going to try again, we're done
if (st.cfg.reconnect_wait_interval.count() == 0) {
return ec;
}
// Wait for the reconnection interval
BOOST_REDIS_YIELD(resume_point_, 5, run_action_type::wait_for_reconnection)
// Check for cancellations
if (is_terminal_cancel(cancel_state)) {
st.logger.trace("Run: cancelled (3)");
return system::error_code(asio::error::operation_aborted);
}
}
}
// We should never get here
BOOST_ASSERT(false);
return system::error_code();
}
} // namespace boost::redis::detail

View File

@@ -15,6 +15,7 @@
#include <boost/redis/impl/read_buffer.ipp> #include <boost/redis/impl/read_buffer.ipp>
#include <boost/redis/impl/writer_fsm.ipp> #include <boost/redis/impl/writer_fsm.ipp>
#include <boost/redis/impl/reader_fsm.ipp> #include <boost/redis/impl/reader_fsm.ipp>
#include <boost/redis/impl/run_fsm.ipp>
#include <boost/redis/impl/request.ipp> #include <boost/redis/impl/request.ipp>
#include <boost/redis/impl/setup_request_utils.ipp> #include <boost/redis/impl/setup_request_utils.ipp>
#include <boost/redis/impl/response.ipp> #include <boost/redis/impl/response.ipp>

View File

@@ -43,6 +43,7 @@ make_test(test_conn_logging)
make_test(test_writer_fsm) make_test(test_writer_fsm)
make_test(test_reader_fsm) make_test(test_reader_fsm)
make_test(test_connect_fsm) make_test(test_connect_fsm)
make_test(test_run_fsm)
make_test(test_setup_request_utils) make_test(test_setup_request_utils)
make_test(test_multiplexer) make_test(test_multiplexer)

View File

@@ -59,6 +59,7 @@ local tests =
test_conn_logging test_conn_logging
test_writer_fsm test_writer_fsm
test_reader_fsm test_reader_fsm
test_run_fsm
test_connect_fsm test_connect_fsm
test_setup_request_utils test_setup_request_utils
test_multiplexer test_multiplexer

View File

@@ -57,12 +57,6 @@ std::ostream& operator<<(std::ostream& os, const log_message& v)
return os << "log_message { .lvl=" << to_string(v.lvl) << ", .msg=" << v.msg << " }"; return os << "log_message { .lvl=" << to_string(v.lvl) << ", .msg=" << v.msg << " }";
} }
log_fixture::log_fixture()
: lgr{logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) {
msgs.push_back({lvl, std::string(msg)});
})}
{ }
void log_fixture::check_log(std::initializer_list<const log_message> expected, source_location loc) void log_fixture::check_log(std::initializer_list<const log_message> expected, source_location loc)
const const
{ {
@@ -71,4 +65,11 @@ void log_fixture::check_log(std::initializer_list<const log_message> expected, s
} }
} }
logger log_fixture::make_logger()
{
return logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) {
msgs.push_back({lvl, std::string(msg)});
});
}
} // namespace boost::redis::detail } // namespace boost::redis::detail

View File

@@ -7,7 +7,6 @@
#ifndef BOOST_REDIS_TEST_SANSIO_UTILS_HPP #ifndef BOOST_REDIS_TEST_SANSIO_UTILS_HPP
#define BOOST_REDIS_TEST_SANSIO_UTILS_HPP #define BOOST_REDIS_TEST_SANSIO_UTILS_HPP
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/logger.hpp> #include <boost/redis/logger.hpp>
#include <boost/assert/source_location.hpp> #include <boost/assert/source_location.hpp>
@@ -38,12 +37,11 @@ struct log_message {
struct log_fixture { struct log_fixture {
std::vector<log_message> msgs; std::vector<log_message> msgs;
detail::connection_logger lgr;
log_fixture();
void check_log( void check_log(
std::initializer_list<const log_message> expected, std::initializer_list<const log_message> expected,
source_location loc = BOOST_CURRENT_LOCATION) const; source_location loc = BOOST_CURRENT_LOCATION) const;
logger make_logger();
}; };
} // namespace boost::redis::detail } // namespace boost::redis::detail

View File

@@ -105,6 +105,7 @@ auto resolver_data = [] {
// Reduce duplication // Reduce duplication
struct fixture : detail::log_fixture { struct fixture : detail::log_fixture {
config cfg; config cfg;
connection_logger lgr{make_logger()};
connect_fsm fsm{cfg, lgr}; connect_fsm fsm{cfg, lgr};
redis_stream_state st{}; redis_stream_state st{};

475
test/test_run_fsm.cpp Normal file
View File

@@ -0,0 +1,475 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/run_fsm.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/local/basic_endpoint.hpp> // for BOOST_ASIO_HAS_LOCAL_SOCKETS
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include "sansio_utils.hpp"
#include <ostream>
#include <string_view>
using namespace boost::redis;
namespace asio = boost::asio;
using detail::run_fsm;
using detail::multiplexer;
using detail::run_action_type;
using detail::run_action;
using boost::system::error_code;
using boost::asio::cancellation_type_t;
using detail::connection_logger;
using namespace std::chrono_literals;
// Operators
static const char* to_string(run_action_type value)
{
switch (value) {
case run_action_type::done: return "run_action_type::done";
case run_action_type::immediate: return "run_action_type::immediate";
case run_action_type::connect: return "run_action_type::connect";
case run_action_type::parallel_group: return "run_action_type::parallel_group";
case run_action_type::cancel_receive: return "run_action_type::cancel_receive";
case run_action_type::wait_for_reconnection: return "run_action_type::wait_for_reconnection";
default: return "<unknown run_action_type>";
}
}
namespace boost::redis::detail {
std::ostream& operator<<(std::ostream& os, run_action_type type)
{
os << to_string(type);
return os;
}
bool operator==(const run_action& lhs, const run_action& rhs) noexcept
{
return lhs.type == rhs.type && lhs.ec == rhs.ec;
}
std::ostream& operator<<(std::ostream& os, const run_action& act)
{
os << "run_action{ .type=" << act.type;
if (act.type == run_action_type::done)
os << ", .error=" << act.ec;
return os << " }";
}
} // namespace boost::redis::detail
namespace {
struct fixture : detail::log_fixture {
detail::connection_state st;
run_fsm fsm;
fixture(config&& cfg = {})
: st{make_logger(), std::move(cfg)}
{ }
};
config config_no_reconnect()
{
config res;
res.reconnect_wait_interval = 0s;
return res;
}
// Config errors
#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
void test_config_error_unix()
{
// Setup
config cfg;
cfg.unix_socket = "/var/sock";
fixture fix{std::move(cfg)};
// Launching the operation fails immediately
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::immediate);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::unix_sockets_unsupported));
// Log
fix.check_log({
{logger::level::err,
"Invalid configuration: The configuration specified a UNIX socket address, but UNIX sockets "
"are not supported by the system. [boost.redis:24]"},
});
}
#endif
void test_config_error_unix_ssl()
{
// Setup
config cfg;
cfg.use_ssl = true;
cfg.unix_socket = "/var/sock";
fixture fix{std::move(cfg)};
// Launching the operation fails immediately
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::immediate);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::unix_sockets_ssl_unsupported));
// Log
fix.check_log({
{logger::level::err,
"Invalid configuration: The configuration specified UNIX sockets with SSL, which is not "
"supported. [boost.redis:25]"},
});
}
// An error in connect with reconnection enabled triggers a reconnection
void test_connect_error()
{
// Setup
fixture fix;
// Launch the operation
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
// Connect errors. We sleep and try to connect again
act = fix.fsm.resume(fix.st, error::connect_timeout, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
// This time we succeed and we launch the parallel group
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// Run doesn't log, it's the subordinate tasks that do
fix.check_log({});
}
// An error in connect without reconnection enabled makes the operation finish
void test_connect_error_no_reconnect()
{
// Setup
fixture fix{config_no_reconnect()};
// Launch the operation
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
// Connect errors. The operation finishes
act = fix.fsm.resume(fix.st, error::connect_timeout, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::connect_timeout));
// Run doesn't log, it's the subordinate tasks that do
fix.check_log({});
}
// A cancellation in connect makes the operation finish even with reconnection enabled
void test_connect_cancel()
{
// Setup
fixture fix;
// Launch the operation
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
// Connect cancelled. The operation finishes
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// We log on cancellation only
fix.check_log({
{logger::level::debug, "Run: cancelled (1)"}
});
}
// Same, but only the cancellation is set
void test_connect_cancel_edge()
{
// Setup
fixture fix;
// Launch the operation
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
// Connect cancelled. The operation finishes
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// We log on cancellation only
fix.check_log({
{logger::level::debug, "Run: cancelled (1)"}
});
}
// An error in the parallel group triggers a reconnection
// (the parallel group always exits with an error)
void test_parallel_group_error()
{
// Setup
fixture fix;
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// This exits with an error. We sleep and connect again
act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// Run doesn't log, it's the subordinate tasks that do
fix.check_log({});
}
// An error in the parallel group makes the operation exit if reconnection is disabled
void test_parallel_group_error_no_reconnect()
{
// Setup
fixture fix{config_no_reconnect()};
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// This exits with an error. We cancel the receive operation and exit
act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::empty_field));
// Run doesn't log, it's the subordinate tasks that do
fix.check_log({});
}
// A cancellation in the parallel group makes it exit, even if reconnection is enabled.
// Parallel group tasks always exit with an error, so there is no edge case here
void test_parallel_group_cancel()
{
// Setup
fixture fix;
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// This exits because the operation gets cancelled. Any receive operation gets cancelled
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// We log on cancellation only
fix.check_log({
{logger::level::debug, "Run: cancelled (2)"}
});
}
void test_parallel_group_cancel_no_reconnect()
{
// Setup
fixture fix{config_no_reconnect()};
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// This exits because the operation gets cancelled. Any receive operation gets cancelled
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// We log on cancellation only
fix.check_log({
{logger::level::debug, "Run: cancelled (2)"}
});
}
// If the reconnection wait gets cancelled, we exit
void test_wait_cancel()
{
// Setup
fixture fix;
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// This exits with an error. We sleep
act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection);
// We get cancelled during the sleep
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// We log on cancellation only
fix.check_log({
{logger::level::debug, "Run: cancelled (3)"}
});
}
void test_wait_cancel_edge()
{
// Setup
fixture fix;
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// This exits with an error. We sleep
act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection);
// We get cancelled during the sleep
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// We log on cancellation only
fix.check_log({
{logger::level::debug, "Run: cancelled (3)"}
});
}
void test_several_reconnections()
{
// Setup
fixture fix;
// Run the operation. Connect errors and we sleep
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error::connect_timeout, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection);
// Connect again, this time successfully. We launch the tasks
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// This exits with an error. We sleep and connect again
act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// Exit with cancellation
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// The cancellation was logged
fix.check_log({
{logger::level::debug, "Run: cancelled (2)"}
});
}
// Setup and ping requests are only composed once at startup
void test_setup_ping_requests()
{
// Setup
config cfg;
cfg.health_check_id = "some_value";
cfg.username = "foo";
cfg.password = "bar";
cfg.clientname = "";
fixture fix{std::move(cfg)};
// Run the operation. We connect and launch the tasks
auto act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// At this point, the requests are set up
const std::string_view expected_ping = "*2\r\n$4\r\nPING\r\n$10\r\nsome_value\r\n";
const std::string_view
expected_setup = "*5\r\n$5\r\nHELLO\r\n$1\r\n3\r\n$4\r\nAUTH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n";
BOOST_TEST_EQ(fix.st.ping_req.payload(), expected_ping);
BOOST_TEST_EQ(fix.st.cfg.setup.payload(), expected_setup);
// Reconnect
act = fix.fsm.resume(fix.st, error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::cancel_receive);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::wait_for_reconnection);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::connect);
act = fix.fsm.resume(fix.st, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, run_action_type::parallel_group);
// The requests haven't been modified
BOOST_TEST_EQ(fix.st.ping_req.payload(), expected_ping);
BOOST_TEST_EQ(fix.st.cfg.setup.payload(), expected_setup);
}
} // namespace
int main()
{
#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
test_config_error_unix();
#endif
test_config_error_unix_ssl();
test_connect_error();
test_connect_error_no_reconnect();
test_connect_cancel();
test_connect_cancel_edge();
test_parallel_group_error();
test_parallel_group_error_no_reconnect();
test_parallel_group_cancel();
test_parallel_group_cancel_no_reconnect();
test_wait_cancel();
test_wait_cancel_edge();
test_several_reconnections();
test_setup_ping_requests();
return boost::report_errors();
}

View File

@@ -90,6 +90,7 @@ struct test_elem {
struct fixture : detail::log_fixture { struct fixture : detail::log_fixture {
multiplexer mpx; multiplexer mpx;
connection_logger lgr{make_logger()};
writer_fsm fsm{mpx, lgr}; writer_fsm fsm{mpx, lgr};
}; };