2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Implements connect as a FSM and fixes cancellation (#320)

Implements redis_stream::async_connect as a FSM
Adds per-operation cancellation handling code
Adds tests
This commit is contained in:
Anarthal (Rubén Pérez)
2025-10-06 12:39:28 +02:00
committed by GitHub
parent 1812be87bf
commit 0c159280ba
9 changed files with 1050 additions and 132 deletions

View File

@@ -531,7 +531,7 @@ public:
for (;;) {
// Try to connect
BOOST_ASIO_CORO_YIELD
conn_->stream_.async_connect(&conn_->cfg_, &conn_->logger_, std::move(self));
conn_->stream_.async_connect(conn_->cfg_, conn_->logger_, std::move(self));
// Check for cancellations
if (is_cancelled(self)) {

View File

@@ -0,0 +1,97 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef BOOST_REDIS_CONNECT_FSM_HPP
#define BOOST_REDIS_CONNECT_FSM_HPP
#include <boost/redis/config.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/system/error_code.hpp>
// Sans-io algorithm for redis_stream::async_connect, as a finite state machine
namespace boost::redis::detail {
class connection_logger;
// What transport is redis_stream using?
enum class transport_type
{
tcp, // plaintext TCP
tcp_tls, // TLS over TCP
unix_socket, // UNIX domain sockets
};
struct redis_stream_state {
transport_type type{transport_type::tcp};
bool ssl_stream_used{false};
};
// What should we do next?
enum class connect_action_type
{
unix_socket_close, // Close the UNIX socket, to discard state
unix_socket_connect, // Connect to the UNIX socket
tcp_resolve, // Name resolution
tcp_connect, // TCP connect
ssl_stream_reset, // Re-create the SSL stream, to discard state
ssl_handshake, // SSL handshake
done, // Complete the async op
};
struct connect_action {
connect_action_type type;
system::error_code ec;
connect_action(connect_action_type type) noexcept
: type{type}
{ }
connect_action(system::error_code ec) noexcept
: type{connect_action_type::done}
, ec{ec}
{ }
};
class connect_fsm {
int resume_point_{0};
const config* cfg_{nullptr};
connection_logger* lgr_{nullptr};
public:
connect_fsm(const config& cfg, connection_logger& lgr) noexcept
: cfg_(&cfg)
, lgr_(&lgr)
{ }
const config& get_config() const { return *cfg_; }
connect_action resume(
system::error_code ec,
const asio::ip::tcp::resolver::results_type& resolver_results,
redis_stream_state& st,
asio::cancellation_type_t cancel_state);
connect_action resume(
system::error_code ec,
const asio::ip::tcp::endpoint& selected_endpoint,
redis_stream_state& st,
asio::cancellation_type_t cancel_state);
connect_action resume(
system::error_code ec,
redis_stream_state& st,
asio::cancellation_type_t cancel_state);
}; // namespace boost::redis::detail
} // namespace boost::redis::detail
#endif

View File

@@ -8,6 +8,7 @@
#define BOOST_REDIS_REDIS_STREAM_HPP
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connect_fsm.hpp>
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/error.hpp>
@@ -31,14 +32,6 @@ namespace boost {
namespace redis {
namespace detail {
// What transport is redis_stream using?
enum class transport_type
{
tcp, // plaintext TCP
tcp_tls, // TLS over TCP
unix_socket, // UNIX domain sockets
};
template <class Executor>
class redis_stream {
asio::ssl::context ssl_ctx_;
@@ -48,139 +41,102 @@ class redis_stream {
asio::basic_stream_socket<asio::local::stream_protocol, Executor> unix_socket_;
#endif
typename asio::steady_timer::template rebind_executor<Executor>::other timer_;
transport_type transport_{transport_type::tcp};
bool ssl_stream_used_{false};
redis_stream_state st_;
void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; }
static transport_type transport_from_config(const config& cfg)
{
if (cfg.unix_socket.empty()) {
if (cfg.use_ssl) {
return transport_type::tcp_tls;
} else {
return transport_type::tcp;
}
} else {
BOOST_ASSERT(!cfg.use_ssl);
return transport_type::unix_socket;
}
}
struct connect_op {
redis_stream& obj;
const config* cfg;
connection_logger* lgr;
asio::coroutine coro{};
connect_fsm fsm_;
// This overload will be used for connects. We only need the endpoint
// for logging, so log it and call the coroutine
template <class Self>
void execute_action(Self& self, connect_action act)
{
const auto& cfg = fsm_.get_config();
switch (act.type) {
case connect_action_type::unix_socket_close:
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
{
system::error_code ec;
obj.unix_socket_.close(ec);
(*this)(self, ec); // This is a sync action
}
#else
BOOST_ASSERT(false);
#endif
return;
case connect_action_type::unix_socket_connect:
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
obj.unix_socket_.async_connect(
cfg.unix_socket,
asio::cancel_after(obj.timer_, cfg.connect_timeout, std::move(self)));
#else
BOOST_ASSERT(false);
#endif
return;
case connect_action_type::tcp_resolve:
obj.resolv_.async_resolve(
cfg.addr.host,
cfg.addr.port,
asio::cancel_after(obj.timer_, cfg.resolve_timeout, std::move(self)));
return;
case connect_action_type::ssl_stream_reset:
obj.reset_stream();
// this action does not require yielding. Execute the next action immediately
(*this)(self);
return;
case connect_action_type::ssl_handshake:
obj.stream_.async_handshake(
asio::ssl::stream_base::client,
asio::cancel_after(obj.timer_, cfg.ssl_handshake_timeout, std::move(self)));
return;
case connect_action_type::done: self.complete(act.ec); break;
// Connect should use the specialized handler, where resolver results are available
case connect_action_type::tcp_connect:
default: BOOST_ASSERT(false);
}
}
// This overload will be used for connects
template <class Self>
void operator()(
Self& self,
system::error_code ec,
const asio::ip::tcp::endpoint& selected_endpoint)
{
lgr->on_connect(ec, selected_endpoint);
(*this)(self, ec);
auto act = fsm_.resume(
ec,
selected_endpoint,
obj.st_,
self.get_cancellation_state().cancelled());
execute_action(self, act);
}
// This overload will be used for resolves
template <class Self>
void operator()(
Self& self,
system::error_code ec = {},
asio::ip::tcp::resolver::results_type resolver_results = {})
system::error_code ec,
asio::ip::tcp::resolver::results_type endpoints)
{
BOOST_ASIO_CORO_REENTER(coro)
{
// Record the transport that we will be using
obj.transport_ = transport_from_config(*cfg);
if (obj.transport_ == transport_type::unix_socket) {
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
// Discard any existing state
obj.unix_socket_.close(ec);
// Directly connect to the socket
BOOST_ASIO_CORO_YIELD
obj.unix_socket_.async_connect(
cfg->unix_socket,
asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
// Log it
lgr->on_connect(ec, cfg->unix_socket);
// If this failed, we can't continue
if (ec) {
self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
return;
}
#else
BOOST_ASSERT(false);
#endif
} else {
// ssl::stream doesn't support being re-used. If we're to use
// TLS and the stream has been used, re-create it.
// Must be done before anything else is done on the stream.
// Note that we don't need to close the socket here because
// range connect does it for us.
if (cfg->use_ssl && obj.ssl_stream_used_)
obj.reset_stream();
BOOST_ASIO_CORO_YIELD
obj.resolv_.async_resolve(
cfg->addr.host,
cfg->addr.port,
asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self)));
// Log it
lgr->on_resolve(ec, resolver_results);
// If this failed, we can't continue
if (ec) {
self.complete(ec == asio::error::operation_aborted ? error::resolve_timeout : ec);
return;
}
// Connect to the address that the resolver provided us
BOOST_ASIO_CORO_YIELD
auto act = fsm_.resume(ec, endpoints, obj.st_, self.get_cancellation_state().cancelled());
if (act.type == connect_action_type::tcp_connect) {
asio::async_connect(
obj.stream_.next_layer(),
std::move(resolver_results),
asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
// Note: logging is performed in the specialized operator() function.
// If this failed, we can't continue
if (ec) {
self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
return;
}
if (cfg->use_ssl) {
// Mark the SSL stream as used
obj.ssl_stream_used_ = true;
// If we were configured to use TLS, perform the handshake
BOOST_ASIO_CORO_YIELD
obj.stream_.async_handshake(
asio::ssl::stream_base::client,
asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self)));
lgr->on_ssl_handshake(ec);
// If this failed, we can't continue
if (ec) {
self.complete(
ec == asio::error::operation_aborted ? error::ssl_handshake_timeout : ec);
return;
}
std::move(endpoints),
asio::cancel_after(obj.timer_, fsm_.get_config().connect_timeout, std::move(self)));
} else {
execute_action(self, act);
}
}
// Done
self.complete(system::error_code());
}
template <class Self>
void operator()(Self& self, system::error_code ec = {})
{
auto act = fsm_.resume(ec, obj.st_, self.get_cancellation_state().cancelled());
execute_action(self, act);
}
};
@@ -204,7 +160,7 @@ public:
bool is_open() const
{
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
if (transport_ == transport_type::unix_socket)
if (st_.type == transport_type::unix_socket)
return unix_socket_.is_open();
#endif
return stream_.next_layer().is_open();
@@ -214,10 +170,10 @@ public:
// I/O
template <class CompletionToken>
auto async_connect(const config* cfg, connection_logger* l, CompletionToken&& token)
auto async_connect(const config& cfg, connection_logger& l, CompletionToken&& token)
{
return asio::async_compose<CompletionToken, void(system::error_code)>(
connect_op{*this, cfg, l},
connect_op{*this, connect_fsm(cfg, l)},
token);
}
@@ -225,7 +181,7 @@ public:
template <class ConstBufferSequence, class CompletionToken>
void async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token)
{
switch (transport_) {
switch (st_.type) {
case transport_type::tcp:
{
stream_.next_layer().async_write_some(buffers, std::forward<CompletionToken>(token));
@@ -250,7 +206,7 @@ public:
template <class MutableBufferSequence, class CompletionToken>
void async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token)
{
switch (transport_) {
switch (st_.type) {
case transport_type::tcp:
{
return stream_.next_layer().async_read_some(

View File

@@ -0,0 +1,176 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connect_fsm.hpp>
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/error.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
namespace boost::redis::detail {
inline transport_type transport_from_config(const config& cfg)
{
if (cfg.unix_socket.empty()) {
if (cfg.use_ssl) {
return transport_type::tcp_tls;
} else {
return transport_type::tcp;
}
} else {
BOOST_ASSERT(!cfg.use_ssl);
return transport_type::unix_socket;
}
}
inline system::error_code translate_timeout_error(
system::error_code io_ec,
asio::cancellation_type_t cancel_state,
error code_if_cancelled)
{
// Translates cancellations and timeout errors into a single error_code.
// - Cancellation state set, and an I/O error: the entire operation was cancelled.
// The I/O code (probably operation_aborted) is appropriate.
// - Cancellation state set, and no I/O error: same as above, but the cancellation
// arrived after the operation completed and before the handler was called. Set the code here.
// - No cancellation state set, I/O error set to operation_aborted: since we use cancel_after,
// this means a timeout.
// - Otherwise, respect the I/O error.
if ((cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none) {
return io_ec ? io_ec : asio::error::operation_aborted;
}
return io_ec == asio::error::operation_aborted ? code_if_cancelled : io_ec;
}
connect_action connect_fsm::resume(
system::error_code ec,
const asio::ip::tcp::resolver::results_type& resolver_results,
redis_stream_state& st,
asio::cancellation_type_t cancel_state)
{
// Translate error codes
ec = translate_timeout_error(ec, cancel_state, error::resolve_timeout);
// Log it
lgr_->on_resolve(ec, resolver_results);
// Delegate to the regular resume function
return resume(ec, st, cancel_state);
}
connect_action connect_fsm::resume(
system::error_code ec,
const asio::ip::tcp::endpoint& selected_endpoint,
redis_stream_state& st,
asio::cancellation_type_t cancel_state)
{
// Translate error codes
ec = translate_timeout_error(ec, cancel_state, error::connect_timeout);
// Log it
lgr_->on_connect(ec, selected_endpoint);
// Delegate to the regular resume function
return resume(ec, st, cancel_state);
}
connect_action connect_fsm::resume(
system::error_code ec,
redis_stream_state& st,
asio::cancellation_type_t cancel_state)
{
switch (resume_point_) {
BOOST_REDIS_CORO_INITIAL
// Record the transport that we will be using
st.type = transport_from_config(*cfg_);
if (st.type == transport_type::unix_socket) {
// Reset the socket, to discard any previous state. Ignore any errors
BOOST_REDIS_YIELD(resume_point_, 1, connect_action_type::unix_socket_close)
// Connect to the socket
BOOST_REDIS_YIELD(resume_point_, 2, connect_action_type::unix_socket_connect)
// Fix error codes. If we were cancelled and the code is operation_aborted,
// it is because per-operation cancellation was activated. If we were not cancelled
// but the operation failed with operation_aborted, it's a timeout.
// Also check for cancellations that didn't cause a failure
ec = translate_timeout_error(ec, cancel_state, error::connect_timeout);
// Log it
lgr_->on_connect(ec, cfg_->unix_socket);
// If this failed, we can't continue
if (ec) {
return ec;
}
// Done
return system::error_code();
} else {
// ssl::stream doesn't support being re-used. If we're to use
// TLS and the stream has been used, re-create it.
// Must be done before anything else is done on the stream.
// We don't need to close the TCP socket if using plaintext TCP
// because range-connect closes open sockets, while individual connect doesn't
if (cfg_->use_ssl && st.ssl_stream_used) {
BOOST_REDIS_YIELD(resume_point_, 3, connect_action_type::ssl_stream_reset)
}
// Resolve names. The continuation needs access to the returned
// endpoints, and is a specialized resume() that will call this function
BOOST_REDIS_YIELD(resume_point_, 4, connect_action_type::tcp_resolve)
// If this failed, we can't continue (error code translation already performed here)
if (ec) {
return ec;
}
// Now connect to the endpoints returned by the resolver.
// This has a specialized resume(), too
BOOST_REDIS_YIELD(resume_point_, 5, connect_action_type::tcp_connect)
// If this failed, we can't continue (error code translation already performed here)
if (ec) {
return ec;
}
if (cfg_->use_ssl) {
// Mark the SSL stream as used
st.ssl_stream_used = true;
// Perform the TLS handshake
BOOST_REDIS_YIELD(resume_point_, 6, connect_action_type::ssl_handshake)
// Translate error codes
ec = translate_timeout_error(ec, cancel_state, error::ssl_handshake_timeout);
// Log it
lgr_->on_ssl_handshake(ec);
// If this failed, we can't continue
if (ec) {
return ec;
}
}
// Done
return system::error_code();
}
}
BOOST_ASSERT(false);
return system::error_code();
}
} // namespace boost::redis::detail

View File

@@ -103,7 +103,7 @@ void connection_logger::on_connect(system::error_code const& ec, asio::ip::tcp::
return;
if (ec) {
msg_ = "Failed connecting to the server: ";
msg_ = "Failed to connect to the server: ";
format_error_code(ec, msg_);
} else {
msg_ = "Connected to ";
@@ -119,7 +119,7 @@ void connection_logger::on_connect(system::error_code const& ec, std::string_vie
return;
if (ec) {
msg_ = "Failed connecting to the server: ";
msg_ = "Failed to connect to the server: ";
format_error_code(ec, msg_);
} else {
msg_ = "Connected to ";
@@ -134,8 +134,12 @@ void connection_logger::on_ssl_handshake(system::error_code const& ec)
if (logger_.lvl < logger::level::info)
return;
msg_ = "SSL handshake: ";
if (ec) {
msg_ = "Failed to perform SSL handshake: ";
format_error_code(ec, msg_);
} else {
msg_ = "Successfully performed SSL handshake";
}
logger_.fn(logger::level::info, msg_);
}

View File

@@ -8,6 +8,7 @@
#include <boost/redis/impl/connection_logger.ipp>
#include <boost/redis/impl/error.ipp>
#include <boost/redis/impl/exec_fsm.ipp>
#include <boost/redis/impl/connect_fsm.ipp>
#include <boost/redis/impl/ignore.ipp>
#include <boost/redis/impl/logger.ipp>
#include <boost/redis/impl/multiplexer.ipp>

View File

@@ -41,6 +41,7 @@ make_test(test_exec_fsm)
make_test(test_log_to_file)
make_test(test_conn_logging)
make_test(test_reader_fsm)
make_test(test_connect_fsm)
make_test(test_setup_request_utils)
make_test(test_multiplexer)

View File

@@ -58,6 +58,7 @@ local tests =
test_log_to_file
test_conn_logging
test_reader_fsm
test_connect_fsm
test_setup_request_utils
test_multiplexer
;

682
test/test_connect_fsm.cpp Normal file
View File

@@ -0,0 +1,682 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/config.hpp>
#include <boost/redis/detail/connect_fsm.hpp>
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/core/lightweight_test.hpp>
#include <iterator>
#include <ostream>
#include <string>
#include <string_view>
#include <vector>
using namespace boost::redis;
namespace asio = boost::asio;
using detail::connect_fsm;
using detail::connect_action_type;
using detail::connect_action;
using detail::connection_logger;
using detail::redis_stream_state;
using detail::transport_type;
using asio::ip::tcp;
using boost::system::error_code;
using boost::asio::cancellation_type_t;
using resolver_results = tcp::resolver::results_type;
// Operators
static const char* to_string(connect_action_type type)
{
switch (type) {
case connect_action_type::unix_socket_close: return "connect_action_type::unix_socket_close";
case connect_action_type::unix_socket_connect:
return "connect_action_type::unix_socket_connect";
case connect_action_type::tcp_resolve: return "connect_action_type::tcp_resolve";
case connect_action_type::tcp_connect: return "connect_action_type::tcp_connect";
case connect_action_type::ssl_stream_reset: return "connect_action_type::ssl_stream_reset";
case connect_action_type::ssl_handshake: return "connect_action_type::ssl_handshake";
case connect_action_type::done: return "connect_action_type::done";
default: return "<unknown connect_action_type>";
}
}
static const char* to_string(transport_type type)
{
switch (type) {
case transport_type::tcp: return "transport_type::tcp";
case transport_type::tcp_tls: return "transport_type::tcp_tls";
case transport_type::unix_socket: return "transport_type::unix_socket";
default: return "<unknown transport_type>";
}
}
static const char* to_string(logger::level lvl)
{
switch (lvl) {
case logger::level::disabled: return "logger::level::disabled";
case logger::level::emerg: return "logger::level::emerg";
case logger::level::alert: return "logger::level::alert";
case logger::level::crit: return "logger::level::crit";
case logger::level::err: return "logger::level::err";
case logger::level::warning: return "logger::level::warning";
case logger::level::notice: return "logger::level::notice";
case logger::level::info: return "logger::level::info";
case logger::level::debug: return "logger::level::debug";
default: return "<unknown logger::level>";
}
}
namespace boost::redis {
std::ostream& operator<<(std::ostream& os, logger::level lvl) { return os << to_string(lvl); }
} // namespace boost::redis
namespace boost::redis::detail {
std::ostream& operator<<(std::ostream& os, connect_action_type type)
{
return os << to_string(type);
}
std::ostream& operator<<(std::ostream& os, transport_type type) { return os << to_string(type); }
bool operator==(const connect_action& lhs, const connect_action& rhs) noexcept
{
return lhs.type == rhs.type && lhs.ec == rhs.ec;
}
std::ostream& operator<<(std::ostream& os, const connect_action& act)
{
os << "connect_action{ .type=" << act.type;
if (act.type == connect_action_type::done)
os << ", .error=" << act.ec;
return os << " }";
}
} // namespace boost::redis::detail
namespace {
// TCP endpoints
const tcp::endpoint endpoint(asio::ip::make_address("192.168.10.1"), 1234);
const tcp::endpoint endpoint2(asio::ip::make_address("192.168.10.2"), 1235);
auto resolver_data = [] {
const tcp::endpoint data[] = {endpoint, endpoint2};
return asio::ip::tcp::resolver::results_type::create(
std::begin(data),
std::end(data),
"my_host",
"1234");
}();
// For checking logs
struct log_message {
logger::level lvl;
std::string msg;
friend bool operator==(const log_message& lhs, const log_message& rhs) noexcept
{
return lhs.lvl == rhs.lvl && lhs.msg == rhs.msg;
}
friend std::ostream& operator<<(std::ostream& os, const log_message& v)
{
return os << "log_message { .lvl=" << v.lvl << ", .msg=" << v.msg << " }";
}
};
// Reduce duplication
struct fixture {
config cfg;
std::ostringstream oss{};
std::vector<log_message> msgs{};
detail::connection_logger lgr{
logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) {
msgs.push_back({lvl, std::string(msg)});
})};
connect_fsm fsm{cfg, lgr};
redis_stream_state st{};
};
config make_ssl_config()
{
config cfg;
cfg.use_ssl = true;
return cfg;
}
config make_unix_config()
{
config cfg;
cfg.unix_socket = "/run/redis.sock";
return cfg;
}
void test_tcp_success()
{
// Setup
fixture fix;
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::done);
// The transport type was appropriately set
BOOST_TEST_EQ(fix.st.type, transport_type::tcp);
BOOST_TEST_NOT(fix.st.ssl_stream_used);
// Check logging
const log_message expected[] = {
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
{logger::level::info, "Connected to 192.168.10.1:1234" },
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_tcp_tls_success()
{
// Setup
fixture fix{make_ssl_config()};
// Run the algorithm. No SSL stream reset is performed here
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::ssl_handshake);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::done);
// The transport type was appropriately set
BOOST_TEST_EQ(fix.st.type, transport_type::tcp_tls);
BOOST_TEST(fix.st.ssl_stream_used);
// Check logging
const log_message expected[] = {
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
{logger::level::info, "Connected to 192.168.10.1:1234" },
{logger::level::info, "Successfully performed SSL handshake" },
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_tcp_tls_success_reconnect()
{
// Setup
fixture fix{make_ssl_config()};
fix.st.ssl_stream_used = true;
// Run the algorithm. The stream is used, so it needs to be reset
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::ssl_stream_reset);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::ssl_handshake);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::done);
// The transport type was appropriately set
BOOST_TEST_EQ(fix.st.type, transport_type::tcp_tls);
BOOST_TEST(fix.st.ssl_stream_used);
// Check logging
const log_message expected[] = {
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
{logger::level::info, "Connected to 192.168.10.1:1234" },
{logger::level::info, "Successfully performed SSL handshake" },
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_unix_success()
{
// Setup
fixture fix{make_unix_config()};
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_close);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::done);
// The transport type was appropriately set
BOOST_TEST_EQ(fix.st.type, transport_type::unix_socket);
BOOST_TEST_NOT(fix.st.ssl_stream_used);
// Check logging
const log_message expected[] = {
{logger::level::info, "Connected to /run/redis.sock"},
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
// Close errors are ignored
void test_unix_success_close_error()
{
// Setup
fixture fix{make_unix_config()};
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_close);
act = fix.fsm.resume(asio::error::bad_descriptor, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::done);
// The transport type was appropriately set
BOOST_TEST_EQ(fix.st.type, transport_type::unix_socket);
BOOST_TEST_NOT(fix.st.ssl_stream_used);
// Check logging
const log_message expected[] = {
{logger::level::info, "Connected to /run/redis.sock"},
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
// Resolve errors
void test_tcp_resolve_error()
{
// Setup
fixture fix;
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error::empty_field, resolver_results{}, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::empty_field));
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Error resolving the server hostname: Expected field value is empty. [boost.redis:5]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_tcp_resolve_timeout()
{
// Setup
fixture fix;
// Since we use cancel_after, a timeout is an operation_aborted without a cancellation state set
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(
asio::error::operation_aborted,
resolver_results{},
fix.st,
cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::resolve_timeout));
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Error resolving the server hostname: Resolve timeout. [boost.redis:17]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_tcp_resolve_cancel()
{
// Setup
fixture fix;
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(
asio::error::operation_aborted,
resolver_results{},
fix.st,
cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// Logging here is system-dependent, so we don't check the message
BOOST_TEST_EQ(fix.msgs.size(), 1u);
}
void test_tcp_resolve_cancel_edge()
{
// Setup
fixture fix;
// Cancel state set but no error
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_results{}, fix.st, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// Logging here is system-dependent, so we don't check the message
BOOST_TEST_EQ(fix.msgs.size(), 1u);
}
// Connect errors
void test_tcp_connect_error()
{
// Setup
fixture fix;
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error::empty_field, tcp::endpoint{}, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::empty_field));
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
{logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_tcp_connect_timeout()
{
// Setup
fixture fix;
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(
asio::error::operation_aborted,
tcp::endpoint{},
fix.st,
cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::connect_timeout));
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
{logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_tcp_connect_cancel()
{
// Setup
fixture fix;
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(
asio::error::operation_aborted,
tcp::endpoint{},
fix.st,
cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// Logging here is system-dependent, so we don't check the message
BOOST_TEST_EQ(fix.msgs.size(), 2u);
}
void test_tcp_connect_cancel_edge()
{
// Setup
fixture fix;
// Run the algorithm. Cancellation state set but no error
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), tcp::endpoint{}, fix.st, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// Logging here is system-dependent, so we don't check the message
BOOST_TEST_EQ(fix.msgs.size(), 2u);
}
// SSL handshake error
void test_ssl_handshake_error()
{
// Setup
fixture fix{make_ssl_config()};
// Run the algorithm. No SSL stream reset is performed here
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::ssl_handshake);
act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::empty_field));
// The stream is marked as used
BOOST_TEST(fix.st.ssl_stream_used);
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
{logger::level::info, "Connected to 192.168.10.1:1234" },
{logger::level::info, "Failed to perform SSL handshake: Expected field value is empty. [boost.redis:5]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_ssl_handshake_timeout()
{
// Setup
fixture fix{make_ssl_config()};
// Run the algorithm. Timeout = operation_aborted without the cancel type set
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::ssl_handshake);
act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::ssl_handshake_timeout));
// The stream is marked as used
BOOST_TEST(fix.st.ssl_stream_used);
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
{logger::level::info, "Connected to 192.168.10.1:1234" },
{logger::level::info, "Failed to perform SSL handshake: SSL handshake timeout. [boost.redis:20]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_ssl_handshake_cancel()
{
// Setup
fixture fix{make_ssl_config()};
// Run the algorithm. Cancel = operation_aborted with the cancel type set
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::ssl_handshake);
act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// The stream is marked as used
BOOST_TEST(fix.st.ssl_stream_used);
// Logging is system-dependent, so we don't check messages
BOOST_TEST_EQ(fix.msgs.size(), 3u);
}
void test_ssl_handshake_cancel_edge()
{
// Setup
fixture fix{make_ssl_config()};
// Run the algorithm. No error, but the cancel state is set
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_resolve);
act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::tcp_connect);
act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::ssl_handshake);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// The stream is marked as used
BOOST_TEST(fix.st.ssl_stream_used);
// Logging is system-dependent, so we don't check messages
BOOST_TEST_EQ(fix.msgs.size(), 3u);
}
// UNIX connect errors
void test_unix_connect_error()
{
// Setup
fixture fix{make_unix_config()};
// Run the algorithm
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_close);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect);
act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::empty_field));
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_unix_connect_timeout()
{
// Setup
fixture fix{make_unix_config()};
// Run the algorithm. Timeout = operation_aborted without a cancel state
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_close);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect);
act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::connect_timeout));
// Check logging
const log_message expected[] = {
// clang-format off
{logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"},
// clang-format on
};
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
}
void test_unix_connect_cancel()
{
// Setup
fixture fix{make_unix_config()};
// Run the algorithm. Cancel = operation_aborted with a cancel state
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_close);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect);
act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// Logging is system-dependent
BOOST_TEST_EQ(fix.msgs.size(), 1u);
}
void test_unix_connect_cancel_edge()
{
// Setup
fixture fix{make_unix_config()};
// Run the algorithm. No error, but cancel state is set
auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_close);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none);
BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect);
act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
// Logging is system-dependent
BOOST_TEST_EQ(fix.msgs.size(), 1u);
}
} // namespace
int main()
{
test_tcp_success();
test_tcp_tls_success();
test_tcp_tls_success_reconnect();
test_unix_success();
test_unix_success_close_error();
test_tcp_resolve_error();
test_tcp_resolve_timeout();
test_tcp_resolve_cancel();
test_tcp_resolve_cancel_edge();
test_tcp_connect_error();
test_tcp_connect_timeout();
test_tcp_connect_cancel();
test_tcp_connect_cancel_edge();
test_ssl_handshake_error();
test_ssl_handshake_timeout();
test_ssl_handshake_cancel();
test_ssl_handshake_cancel_edge();
test_unix_connect_error();
test_unix_connect_timeout();
test_unix_connect_cancel();
test_unix_connect_cancel_edge();
return boost::report_errors();
}