// // Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), // Ruben Perez Hidalgo (rubenperez038 at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include #include #include #include #include #include #include "sansio_utils.hpp" #include #include #include #include #include using namespace boost::redis; namespace asio = boost::asio; using detail::connect_fsm; using detail::connect_action_type; using detail::connect_action; using detail::buffered_logger; using detail::redis_stream_state; using detail::transport_type; using asio::ip::tcp; using boost::system::error_code; using boost::asio::cancellation_type_t; using resolver_results = tcp::resolver::results_type; // Operators static const char* to_string(connect_action_type type) { switch (type) { case connect_action_type::unix_socket_close: return "connect_action_type::unix_socket_close"; case connect_action_type::unix_socket_connect: return "connect_action_type::unix_socket_connect"; case connect_action_type::tcp_resolve: return "connect_action_type::tcp_resolve"; case connect_action_type::tcp_connect: return "connect_action_type::tcp_connect"; case connect_action_type::ssl_stream_reset: return "connect_action_type::ssl_stream_reset"; case connect_action_type::ssl_handshake: return "connect_action_type::ssl_handshake"; case connect_action_type::done: return "connect_action_type::done"; default: return ""; } } static const char* to_string(transport_type type) { switch (type) { case transport_type::tcp: return "transport_type::tcp"; case transport_type::tcp_tls: return "transport_type::tcp_tls"; case transport_type::unix_socket: return "transport_type::unix_socket"; default: return ""; } } namespace boost::redis::detail { std::ostream& operator<<(std::ostream& os, connect_action_type type) { return os << to_string(type); } std::ostream& operator<<(std::ostream& os, transport_type type) { return os << to_string(type); } bool operator==(const connect_action& lhs, const connect_action& rhs) noexcept { return lhs.type == rhs.type && lhs.ec == rhs.ec; } std::ostream& operator<<(std::ostream& os, const connect_action& act) { os << "connect_action{ .type=" << act.type; if (act.type == connect_action_type::done) os << ", .error=" << act.ec; return os << " }"; } } // namespace boost::redis::detail namespace { // TCP endpoints const tcp::endpoint endpoint(asio::ip::make_address("192.168.10.1"), 1234); const tcp::endpoint endpoint2(asio::ip::make_address("192.168.10.2"), 1235); auto resolver_data = [] { const tcp::endpoint data[] = {endpoint, endpoint2}; return asio::ip::tcp::resolver::results_type::create( std::begin(data), std::end(data), "my_host", "1234"); }(); // Reduce duplication struct fixture : detail::log_fixture { buffered_logger lgr{make_logger()}; connect_fsm fsm{lgr}; redis_stream_state st; fixture(transport_type type = transport_type::tcp) : st{type, false} { } }; void test_tcp_success() { // Setup fixture fix; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); // The transport type was appropriately set BOOST_TEST_NOT(fix.st.ssl_stream_used); // Check logging fix.check_log({ // clang-format off {logger::level::debug, "Connect: hostname resolution results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::debug, "Connect: TCP connect succeeded. Selected endpoint: 192.168.10.1:1234" }, // clang-format on }); } void test_tcp_tls_success() { // Setup fixture fix{transport_type::tcp_tls}; // Run the algorithm. No SSL stream reset is performed here auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); // The transport type was appropriately set BOOST_TEST(fix.st.ssl_stream_used); // Check logging fix.check_log({ // clang-format off {logger::level::debug, "Connect: hostname resolution results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::debug, "Connect: TCP connect succeeded. Selected endpoint: 192.168.10.1:1234" }, {logger::level::debug, "Connect: SSL handshake succeeded" }, // clang-format on }); } void test_tcp_tls_success_reconnect() { // Setup fixture fix{transport_type::tcp_tls}; fix.st.ssl_stream_used = true; // Run the algorithm. The stream is used, so it needs to be reset auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::ssl_stream_reset); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); // The transport type was appropriately set BOOST_TEST(fix.st.ssl_stream_used); // Check logging fix.check_log({ // clang-format off {logger::level::debug, "Connect: hostname resolution results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::debug, "Connect: TCP connect succeeded. Selected endpoint: 192.168.10.1:1234" }, {logger::level::debug, "Connect: SSL handshake succeeded" }, // clang-format on }); } void test_unix_success() { // Setup fixture fix{transport_type::unix_socket}; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); // The transport type was appropriately set BOOST_TEST_NOT(fix.st.ssl_stream_used); // Check logging fix.check_log({ {logger::level::debug, "Connect: UNIX socket connect succeeded"}, }); } // Close errors are ignored void test_unix_success_close_error() { // Setup fixture fix{transport_type::unix_socket}; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); act = fix.fsm.resume(asio::error::bad_descriptor, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::done); // The transport type was appropriately set BOOST_TEST_NOT(fix.st.ssl_stream_used); // Check logging fix.check_log({ {logger::level::debug, "Connect: UNIX socket connect succeeded"}, }); } // Resolve errors void test_tcp_resolve_error() { // Setup fixture fix; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error::empty_field, resolver_results{}, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::empty_field)); // Check logging fix.check_log({ // clang-format off {logger::level::info, "Connect: hostname resolution failed: Expected field value is empty. [boost.redis:5]"}, // clang-format on }); } void test_tcp_resolve_timeout() { // Setup fixture fix; // Since we use cancel_after, a timeout is an operation_aborted without a cancellation state set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume( asio::error::operation_aborted, resolver_results{}, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::resolve_timeout)); // Check logging fix.check_log({ // clang-format off {logger::level::info, "Connect: hostname resolution failed: Resolve timeout. [boost.redis:17]"}, // clang-format on }); } void test_tcp_resolve_cancel() { // Setup fixture fix; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume( asio::error::operation_aborted, resolver_results{}, fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // Logging here is system-dependent, so we don't check the message BOOST_TEST_EQ(fix.msgs.size(), 1u); } void test_tcp_resolve_cancel_edge() { // Setup fixture fix; // Cancel state set but no error auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_results{}, fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // Logging here is system-dependent, so we don't check the message BOOST_TEST_EQ(fix.msgs.size(), 1u); } // Connect errors void test_tcp_connect_error() { // Setup fixture fix; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error::empty_field, tcp::endpoint{}, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::empty_field)); // Check logging fix.check_log({ // clang-format off {logger::level::debug, "Connect: hostname resolution results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connect: TCP connect failed: Expected field value is empty. [boost.redis:5]"}, // clang-format on }); } void test_tcp_connect_timeout() { // Setup fixture fix; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume( asio::error::operation_aborted, tcp::endpoint{}, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::connect_timeout)); // Check logging fix.check_log({ // clang-format off {logger::level::debug, "Connect: hostname resolution results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connect: TCP connect failed: Connect timeout. [boost.redis:18]"}, // clang-format on }); } void test_tcp_connect_cancel() { // Setup fixture fix; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume( asio::error::operation_aborted, tcp::endpoint{}, fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // Logging here is system-dependent, so we don't check the message BOOST_TEST_EQ(fix.msgs.size(), 2u); } void test_tcp_connect_cancel_edge() { // Setup fixture fix; // Run the algorithm. Cancellation state set but no error auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), tcp::endpoint{}, fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // Logging here is system-dependent, so we don't check the message BOOST_TEST_EQ(fix.msgs.size(), 2u); } // SSL handshake error void test_ssl_handshake_error() { // Setup fixture fix{transport_type::tcp_tls}; // Run the algorithm. No SSL stream reset is performed here auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::empty_field)); // The stream is marked as used BOOST_TEST(fix.st.ssl_stream_used); // Check logging fix.check_log({ // clang-format off {logger::level::debug, "Connect: hostname resolution results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::debug, "Connect: TCP connect succeeded. Selected endpoint: 192.168.10.1:1234"}, {logger::level::info, "Connect: SSL handshake failed: Expected field value is empty. [boost.redis:5]"}, // clang-format on }); } void test_ssl_handshake_timeout() { // Setup fixture fix{transport_type::tcp_tls}; // Run the algorithm. Timeout = operation_aborted without the cancel type set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::ssl_handshake_timeout)); // The stream is marked as used BOOST_TEST(fix.st.ssl_stream_used); // Check logging fix.check_log({ // clang-format off {logger::level::debug, "Connect: hostname resolution results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::debug, "Connect: TCP connect succeeded. Selected endpoint: 192.168.10.1:1234"}, {logger::level::info, "Connect: SSL handshake failed: SSL handshake timeout. [boost.redis:20]"}, // clang-format on }); } void test_ssl_handshake_cancel() { // Setup fixture fix{transport_type::tcp_tls}; // Run the algorithm. Cancel = operation_aborted with the cancel type set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // The stream is marked as used BOOST_TEST(fix.st.ssl_stream_used); // Logging is system-dependent, so we don't check messages BOOST_TEST_EQ(fix.msgs.size(), 3u); } void test_ssl_handshake_cancel_edge() { // Setup fixture fix{transport_type::tcp_tls}; // Run the algorithm. No error, but the cancel state is set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_resolve); act = fix.fsm.resume(error_code(), resolver_data, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::tcp_connect); act = fix.fsm.resume(error_code(), endpoint, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::ssl_handshake); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // The stream is marked as used BOOST_TEST(fix.st.ssl_stream_used); // Logging is system-dependent, so we don't check messages BOOST_TEST_EQ(fix.msgs.size(), 3u); } // UNIX connect errors void test_unix_connect_error() { // Setup fixture fix{transport_type::unix_socket}; // Run the algorithm auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(error::empty_field, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::empty_field)); // Check logging fix.check_log({ // clang-format off {logger::level::info, "Connect: UNIX socket connect failed: Expected field value is empty. [boost.redis:5]"}, // clang-format on }); } void test_unix_connect_timeout() { // Setup fixture fix{transport_type::unix_socket}; // Run the algorithm. Timeout = operation_aborted without a cancel state auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, error_code(error::connect_timeout)); // Check logging fix.check_log({ // clang-format off {logger::level::info, "Connect: UNIX socket connect failed: Connect timeout. [boost.redis:18]"}, // clang-format on }); } void test_unix_connect_cancel() { // Setup fixture fix{transport_type::unix_socket}; // Run the algorithm. Cancel = operation_aborted with a cancel state auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(asio::error::operation_aborted, fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // Logging is system-dependent BOOST_TEST_EQ(fix.msgs.size(), 1u); } void test_unix_connect_cancel_edge() { // Setup fixture fix{transport_type::unix_socket}; // Run the algorithm. No error, but cancel state is set auto act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_close); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::none); BOOST_TEST_EQ(act, connect_action_type::unix_socket_connect); act = fix.fsm.resume(error_code(), fix.st, cancellation_type_t::terminal); BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); // Logging is system-dependent BOOST_TEST_EQ(fix.msgs.size(), 1u); } } // namespace int main() { test_tcp_success(); test_tcp_tls_success(); test_tcp_tls_success_reconnect(); test_unix_success(); test_unix_success_close_error(); test_tcp_resolve_error(); test_tcp_resolve_timeout(); test_tcp_resolve_cancel(); test_tcp_resolve_cancel_edge(); test_tcp_connect_error(); test_tcp_connect_timeout(); test_tcp_connect_cancel(); test_tcp_connect_cancel_edge(); test_ssl_handshake_error(); test_ssl_handshake_timeout(); test_ssl_handshake_cancel(); test_ssl_handshake_cancel_edge(); test_unix_connect_error(); test_unix_connect_timeout(); test_unix_connect_cancel(); test_unix_connect_cancel_edge(); return boost::report_errors(); }