From 228b31917c56d5eb42709dbb04cf01730f00ac46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anarthal=20=28Rub=C3=A9n=20P=C3=A9rez=29?= <34971811+anarthal@users.noreply.github.com> Date: Thu, 9 Oct 2025 11:31:36 +0200 Subject: [PATCH] Implements the writer as an FSM and adds tests (#325) * Refactors the writer task into a FSM and adds unit tests. * Adds a testing utility to check logging. Entails no functional change (other than cosmetic word fixes to the logs). --- include/boost/redis/connection.hpp | 50 +-- include/boost/redis/detail/writer_fsm.hpp | 66 ++++ .../boost/redis/impl/connection_logger.ipp | 3 +- .../boost/redis/impl/is_terminal_cancel.hpp | 23 ++ include/boost/redis/impl/reader_fsm.ipp | 6 +- include/boost/redis/impl/writer_fsm.ipp | 78 +++++ include/boost/redis/src.hpp | 1 + test/CMakeLists.txt | 1 + test/Jamfile | 1 + test/sansio_utils.cpp | 53 ++- test/sansio_utils.hpp | 25 +- test/test_connect_fsm.cpp | 117 ++----- test/test_writer_fsm.cpp | 323 ++++++++++++++++++ 13 files changed, 618 insertions(+), 129 deletions(-) create mode 100644 include/boost/redis/detail/writer_fsm.hpp create mode 100644 include/boost/redis/impl/is_terminal_cancel.hpp create mode 100644 include/boost/redis/impl/writer_fsm.ipp create mode 100644 test/test_writer_fsm.cpp diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index ad976557..9eb3ceca 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -212,50 +213,27 @@ struct connection_impl { template struct writer_op { connection_impl* conn_; - asio::coroutine coro{}; + writer_fsm fsm_; + + explicit writer_op(connection_impl& conn) noexcept + : conn_(&conn) + , fsm_(conn.mpx_, conn.logger_) + { } template - void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) + void operator()(Self& self, system::error_code ec = {}, std::size_t bytes_written = 0u) { - ignore_unused(n); + auto act = fsm_.resume(ec, bytes_written, self.get_cancellation_state().cancelled()); - BOOST_ASIO_CORO_REENTER(coro) for (;;) - { - while (conn_->mpx_.prepare_write() != 0) { - BOOST_ASIO_CORO_YIELD + switch (act.type) { + case writer_action_type::done: self.complete(act.ec); return; + case writer_action_type::write: asio::async_write( conn_->stream_, asio::buffer(conn_->mpx_.get_write_buffer()), std::move(self)); - - conn_->logger_.on_write(ec, conn_->mpx_.get_write_buffer().size()); - - if (ec) { - conn_->logger_.trace("writer_op (1)", ec); - self.complete(ec); - return; - } - - conn_->mpx_.commit_write(); - - // Check for cancellations - if (is_cancelled(self)) { - conn_->logger_.trace("writer_op (2): cancelled"); - self.complete(asio::error::operation_aborted); - return; - } - } - - // Wait for data to be available - BOOST_ASIO_CORO_YIELD - conn_->writer_timer_.async_wait(std::move(self)); - - // Check for cancellations - if (is_cancelled(self)) { - conn_->logger_.trace("writer_op (3): cancelled"); - self.complete(asio::error::operation_aborted); return; - } + case writer_action_type::wait: conn_->writer_timer_.async_wait(std::move(self)); return; } } }; @@ -473,7 +451,7 @@ private: auto writer(CompletionToken&& token) { return asio::async_compose( - writer_op{conn_}, + writer_op{*conn_}, std::forward(token), conn_->writer_timer_); } diff --git a/include/boost/redis/detail/writer_fsm.hpp b/include/boost/redis/detail/writer_fsm.hpp new file mode 100644 index 00000000..86f11cc2 --- /dev/null +++ b/include/boost/redis/detail/writer_fsm.hpp @@ -0,0 +1,66 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_WRITER_FSM_HPP +#define BOOST_REDIS_WRITER_FSM_HPP + +#include +#include + +#include + +// Sans-io algorithm for the writer task, as a finite state machine + +namespace boost::redis::detail { + +// Forward decls +class connection_logger; +class multiplexer; + +// What should we do next? +enum class writer_action_type +{ + done, // Call the final handler + write, // Issue a write on the stream + wait, // Wait until there is data to be written +}; + +struct writer_action { + writer_action_type type; + system::error_code ec; + + writer_action(writer_action_type type) noexcept + : type{type} + { } + + writer_action(system::error_code ec) noexcept + : type{writer_action_type::done} + , ec{ec} + { } +}; + +class writer_fsm { + int resume_point_{0}; + multiplexer* mpx_; + connection_logger* logger_; + +public: + writer_fsm(multiplexer& mpx, connection_logger& logger) noexcept + : mpx_(&mpx) + , logger_(&logger) + { } + + writer_action resume( + system::error_code ec, + std::size_t bytes_written, + asio::cancellation_type_t cancel_state); +}; + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_CONNECTOR_HPP diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index 5f97bffb..9940d528 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -149,10 +149,11 @@ void connection_logger::on_write(system::error_code const& ec, std::size_t n) if (logger_.lvl < logger::level::info) return; - msg_ = "writer_op: "; if (ec) { + msg_ = "Writer task error: "; format_error_code(ec, msg_); } else { + msg_ = "Writer task: "; msg_ += std::to_string(n); msg_ += " bytes written."; } diff --git a/include/boost/redis/impl/is_terminal_cancel.hpp b/include/boost/redis/impl/is_terminal_cancel.hpp new file mode 100644 index 00000000..308e3647 --- /dev/null +++ b/include/boost/redis/impl/is_terminal_cancel.hpp @@ -0,0 +1,23 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_IS_TERMINAL_CANCEL_HPP +#define BOOST_REDIS_IS_TERMINAL_CANCEL_HPP + +#include + +namespace boost::redis::detail { + +constexpr bool is_terminal_cancel(asio::cancellation_type_t cancel_state) +{ + return (cancel_state & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; +} + +} // namespace boost::redis::detail + +#endif \ No newline at end of file diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index aca75069..32894cad 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -7,17 +7,13 @@ #include #include #include +#include #include #include namespace boost::redis::detail { -inline bool is_terminal_cancel(asio::cancellation_type_t value) -{ - return (value & asio::cancellation_type_t::terminal) != asio::cancellation_type_t::none; -} - reader_fsm::reader_fsm(multiplexer& mpx) noexcept : mpx_{&mpx} { } diff --git a/include/boost/redis/impl/writer_fsm.ipp b/include/boost/redis/impl/writer_fsm.ipp new file mode 100644 index 00000000..bb4304d2 --- /dev/null +++ b/include/boost/redis/impl/writer_fsm.ipp @@ -0,0 +1,78 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef BOOST_REDIS_WRITER_FSM_IPP +#define BOOST_REDIS_WRITER_FSM_IPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +namespace boost::redis::detail { + +writer_action writer_fsm::resume( + system::error_code ec, + std::size_t bytes_written, + asio::cancellation_type_t cancel_state) +{ + switch (resume_point_) { + BOOST_REDIS_CORO_INITIAL + + for (;;) { + // Attempt to write while we have requests ready to send + while (mpx_->prepare_write() != 0u) { + // Write + BOOST_REDIS_YIELD(resume_point_, 1, writer_action_type::write) + + // Mark requests as written + if (!ec) + mpx_->commit_write(); + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + logger_->trace("Writer task: cancelled (1)."); + return system::error_code(asio::error::operation_aborted); + } + + // Log what we wrote + logger_->on_write(ec, bytes_written); + + // Check for errors + if (ec) { + return ec; + } + } + + // No more requests ready to be written. Wait for more + BOOST_REDIS_YIELD(resume_point_, 2, writer_action_type::wait) + + // Check for cancellations + if (is_terminal_cancel(cancel_state)) { + logger_->trace("Writer task: cancelled (2)."); + return system::error_code(asio::error::operation_aborted); + } + } + } + + // We should never reach here + BOOST_ASSERT(false); + return system::error_code(); +} + +} // namespace boost::redis::detail + +#endif diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index b47fb22a..715fd08a 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 01b49824..cdabb61d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -40,6 +40,7 @@ make_test(test_any_adapter) make_test(test_exec_fsm) make_test(test_log_to_file) make_test(test_conn_logging) +make_test(test_writer_fsm) make_test(test_reader_fsm) make_test(test_connect_fsm) make_test(test_setup_request_utils) diff --git a/test/Jamfile b/test/Jamfile index 43b26625..42245848 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -57,6 +57,7 @@ local tests = test_exec_fsm test_log_to_file test_conn_logging + test_writer_fsm test_reader_fsm test_connect_fsm test_setup_request_utils diff --git a/test/sansio_utils.cpp b/test/sansio_utils.cpp index 24fb3b70..3851a01c 100644 --- a/test/sansio_utils.cpp +++ b/test/sansio_utils.cpp @@ -4,10 +4,34 @@ * accompanying file LICENSE.txt) */ +#include + +#include +#include + #include "sansio_utils.hpp" -#include -#include +#include +#include +#include + +using namespace boost::redis; + +static constexpr const char* to_string(logger::level lvl) +{ + switch (lvl) { + case logger::level::disabled: return "logger::level::disabled"; + case logger::level::emerg: return "logger::level::emerg"; + case logger::level::alert: return "logger::level::alert"; + case logger::level::crit: return "logger::level::crit"; + case logger::level::err: return "logger::level::err"; + case logger::level::warning: return "logger::level::warning"; + case logger::level::notice: return "logger::level::notice"; + case logger::level::info: return "logger::level::info"; + case logger::level::debug: return "logger::level::debug"; + default: return ""; + } +} namespace boost::redis::detail { @@ -22,4 +46,29 @@ void read(multiplexer& mpx, std::string_view data) mpx.commit_read(data.size()); } +// Operators to enable checking logs +bool operator==(const log_message& lhs, const log_message& rhs) noexcept +{ + return lhs.lvl == rhs.lvl && lhs.msg == rhs.msg; +} + +std::ostream& operator<<(std::ostream& os, const log_message& v) +{ + return os << "log_message { .lvl=" << to_string(v.lvl) << ", .msg=" << v.msg << " }"; +} + +log_fixture::log_fixture() +: lgr{logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { + msgs.push_back({lvl, std::string(msg)}); +})} +{ } + +void log_fixture::check_log(std::initializer_list expected, source_location loc) + const +{ + if (!BOOST_TEST_ALL_EQ(expected.begin(), expected.end(), msgs.begin(), msgs.end())) { + std::cerr << "Called from " << loc << std::endl; + } +} + } // namespace boost::redis::detail diff --git a/test/sansio_utils.hpp b/test/sansio_utils.hpp index b630b917..7473a354 100644 --- a/test/sansio_utils.hpp +++ b/test/sansio_utils.hpp @@ -7,6 +7,13 @@ #ifndef BOOST_REDIS_TEST_SANSIO_UTILS_HPP #define BOOST_REDIS_TEST_SANSIO_UTILS_HPP +#include +#include + +#include + +#include +#include #include namespace boost::redis::detail { @@ -23,6 +30,22 @@ class multiplexer; // This is used in the multiplexer tests. void read(multiplexer& mpx, std::string_view data); +// Utilities for checking logs +struct log_message { + logger::level lvl; + std::string msg; +}; + +struct log_fixture { + std::vector msgs; + detail::connection_logger lgr; + + log_fixture(); + void check_log( + std::initializer_list expected, + source_location loc = BOOST_CURRENT_LOCATION) const; +}; + } // namespace boost::redis::detail -#endif // BOOST_REDIS_TEST_SANSIO_UTILS_HPP +#endif // BOOST_REDIS_TEST_SANSIO_UTILS_HPP diff --git a/test/test_connect_fsm.cpp b/test/test_connect_fsm.cpp index 75a259b8..73c5767e 100644 --- a/test/test_connect_fsm.cpp +++ b/test/test_connect_fsm.cpp @@ -16,6 +16,8 @@ #include #include +#include "sansio_utils.hpp" + #include #include #include @@ -61,28 +63,6 @@ static const char* to_string(transport_type type) } } -static const char* to_string(logger::level lvl) -{ - switch (lvl) { - case logger::level::disabled: return "logger::level::disabled"; - case logger::level::emerg: return "logger::level::emerg"; - case logger::level::alert: return "logger::level::alert"; - case logger::level::crit: return "logger::level::crit"; - case logger::level::err: return "logger::level::err"; - case logger::level::warning: return "logger::level::warning"; - case logger::level::notice: return "logger::level::notice"; - case logger::level::info: return "logger::level::info"; - case logger::level::debug: return "logger::level::debug"; - default: return ""; - } -} - -namespace boost::redis { - -std::ostream& operator<<(std::ostream& os, logger::level lvl) { return os << to_string(lvl); } - -} // namespace boost::redis - namespace boost::redis::detail { std::ostream& operator<<(std::ostream& os, connect_action_type type) @@ -122,33 +102,15 @@ auto resolver_data = [] { "1234"); }(); -// For checking logs -struct log_message { - logger::level lvl; - std::string msg; - - friend bool operator==(const log_message& lhs, const log_message& rhs) noexcept - { - return lhs.lvl == rhs.lvl && lhs.msg == rhs.msg; - } - - friend std::ostream& operator<<(std::ostream& os, const log_message& v) - { - return os << "log_message { .lvl=" << v.lvl << ", .msg=" << v.msg << " }"; - } -}; - // Reduce duplication -struct fixture { +struct fixture : detail::log_fixture { config cfg; - std::ostringstream oss{}; - std::vector msgs{}; - detail::connection_logger lgr{ - logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) { - msgs.push_back({lvl, std::string(msg)}); - })}; connect_fsm fsm{cfg, lgr}; redis_stream_state st{}; + + fixture(config&& cfg = {}) + : cfg{std::move(cfg)} + { } }; config make_ssl_config() @@ -183,11 +145,10 @@ void test_tcp_success() BOOST_TEST_NOT(fix.st.ssl_stream_used); // Check logging - const log_message expected[] = { + fix.check_log({ {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connected to 192.168.10.1:1234" }, - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_tcp_tls_success() @@ -210,12 +171,11 @@ void test_tcp_tls_success() BOOST_TEST(fix.st.ssl_stream_used); // Check logging - const log_message expected[] = { + fix.check_log({ {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connected to 192.168.10.1:1234" }, {logger::level::info, "Successfully performed SSL handshake" }, - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_tcp_tls_success_reconnect() @@ -241,12 +201,11 @@ void test_tcp_tls_success_reconnect() BOOST_TEST(fix.st.ssl_stream_used); // Check logging - const log_message expected[] = { + fix.check_log({ {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connected to 192.168.10.1:1234" }, {logger::level::info, "Successfully performed SSL handshake" }, - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_unix_success() @@ -267,10 +226,9 @@ void test_unix_success() BOOST_TEST_NOT(fix.st.ssl_stream_used); // Check logging - const log_message expected[] = { + fix.check_log({ {logger::level::info, "Connected to /run/redis.sock"}, - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } // Close errors are ignored @@ -292,10 +250,9 @@ void test_unix_success_close_error() BOOST_TEST_NOT(fix.st.ssl_stream_used); // Check logging - const log_message expected[] = { + fix.check_log({ {logger::level::info, "Connected to /run/redis.sock"}, - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } // Resolve errors @@ -311,12 +268,11 @@ void test_tcp_resolve_error() BOOST_TEST_EQ(act, error_code(error::empty_field)); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Error resolving the server hostname: Expected field value is empty. [boost.redis:5]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_tcp_resolve_timeout() @@ -335,12 +291,11 @@ void test_tcp_resolve_timeout() BOOST_TEST_EQ(act, error_code(error::resolve_timeout)); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Error resolving the server hostname: Resolve timeout. [boost.redis:17]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_tcp_resolve_cancel() @@ -392,13 +347,12 @@ void test_tcp_connect_error() BOOST_TEST_EQ(act, error_code(error::empty_field)); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_tcp_connect_timeout() @@ -419,13 +373,12 @@ void test_tcp_connect_timeout() BOOST_TEST_EQ(act, error_code(error::connect_timeout)); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_tcp_connect_cancel() @@ -486,14 +439,13 @@ void test_ssl_handshake_error() BOOST_TEST(fix.st.ssl_stream_used); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connected to 192.168.10.1:1234" }, {logger::level::info, "Failed to perform SSL handshake: Expected field value is empty. [boost.redis:5]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_ssl_handshake_timeout() @@ -515,14 +467,13 @@ void test_ssl_handshake_timeout() BOOST_TEST(fix.st.ssl_stream_used); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"}, {logger::level::info, "Connected to 192.168.10.1:1234" }, {logger::level::info, "Failed to perform SSL handshake: SSL handshake timeout. [boost.redis:20]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_ssl_handshake_cancel() @@ -584,12 +535,11 @@ void test_unix_connect_error() BOOST_TEST_EQ(act, error_code(error::empty_field)); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_unix_connect_timeout() @@ -606,12 +556,11 @@ void test_unix_connect_timeout() BOOST_TEST_EQ(act, error_code(error::connect_timeout)); // Check logging - const log_message expected[] = { + fix.check_log({ // clang-format off {logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"}, // clang-format on - }; - BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end()); + }); } void test_unix_connect_cancel() diff --git a/test/test_writer_fsm.cpp b/test/test_writer_fsm.cpp new file mode 100644 index 00000000..96e2a9ba --- /dev/null +++ b/test/test_writer_fsm.cpp @@ -0,0 +1,323 @@ +// +// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com), +// Ruben Perez Hidalgo (rubenperez038 at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "sansio_utils.hpp" + +#include +#include + +using namespace boost::redis; +namespace asio = boost::asio; +using detail::writer_fsm; +using detail::multiplexer; +using detail::writer_action_type; +using detail::consume_result; +using detail::writer_action; +using boost::system::error_code; +using boost::asio::cancellation_type_t; +using detail::connection_logger; + +// Operators +static const char* to_string(writer_action_type value) +{ + switch (value) { + case writer_action_type::done: return "writer_action_type::done"; + case writer_action_type::write: return "writer_action_type::write"; + case writer_action_type::wait: return "writer_action_type::wait"; + default: return ""; + } +} + +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, writer_action_type type) +{ + os << to_string(type); + return os; +} + +bool operator==(const writer_action& lhs, const writer_action& rhs) noexcept +{ + return lhs.type == rhs.type && lhs.ec == rhs.ec; +} + +std::ostream& operator<<(std::ostream& os, const writer_action& act) +{ + os << "writer_action{ .type=" << act.type; + if (act.type == writer_action_type::done) + os << ", .error=" << act.ec; + return os << " }"; +} + +} // namespace boost::redis::detail + +namespace { + +// A helper to create a request and its associated elem +struct test_elem { + request req; + bool done{false}; + std::shared_ptr elm; + + test_elem() + { + // Empty requests are not valid. The request needs to be populated before creating the element + req.push("get", "mykey"); + elm = std::make_shared(req, any_adapter{}); + + elm->set_done_callback([this] { + done = true; + }); + } +}; + +struct fixture : detail::log_fixture { + multiplexer mpx; + writer_fsm fsm{mpx, lgr}; +}; + +// A single request is written, then we wait and repeat +void test_single_request() +{ + // Setup + fixture fix; + test_elem item1, item2; + + // A request arrives before the writer starts + fix.mpx.add(item1.elm); + + // Start. A write is triggered, and the request is marked as staged + auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item1.elm->is_staged()); + + // The write completes successfully. The request is written, and we go back to sleep. + act = fix.fsm.resume(error_code(), item1.req.payload().size(), cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::wait); + BOOST_TEST(item1.elm->is_written()); + + // Another request arrives + fix.mpx.add(item2.elm); + + // The wait is cancelled to signal we've got a new request + act = fix.fsm.resume(asio::error::operation_aborted, 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item2.elm->is_staged()); + + // Write successful + act = fix.fsm.resume(error_code(), item2.req.payload().size(), cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::wait); + BOOST_TEST(item2.elm->is_written()); + + // Logs + fix.check_log({ + {logger::level::info, "Writer task: 24 bytes written."}, + {logger::level::info, "Writer task: 24 bytes written."}, + }); +} + +// If a request arrives while we're performing a write, we don't get back to sleep +void test_request_arrives_while_writing() +{ + // Setup + fixture fix; + test_elem item1, item2; + + // A request arrives before the writer starts + fix.mpx.add(item1.elm); + + // Start. A write is triggered, and the request is marked as staged + auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item1.elm->is_staged()); + + // While the write is outstanding, a new request arrives + fix.mpx.add(item2.elm); + + // The write completes successfully. The request is written, + // and we start writing the new one + act = fix.fsm.resume(error_code(), item1.req.payload().size(), cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item1.elm->is_written()); + BOOST_TEST(item2.elm->is_staged()); + + // Write successful + act = fix.fsm.resume(error_code(), item2.req.payload().size(), cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::wait); + BOOST_TEST(item2.elm->is_written()); + + // Logs + fix.check_log({ + {logger::level::info, "Writer task: 24 bytes written."}, + {logger::level::info, "Writer task: 24 bytes written."}, + }); +} + +// If there is no request when the writer starts, we wait for it +void test_no_request_at_startup() +{ + // Setup + fixture fix; + test_elem item; + + // Start. There is no request, so we wait + auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::wait); + + // A request arrives + fix.mpx.add(item.elm); + + // The wait is cancelled to signal we've got a new request + act = fix.fsm.resume(asio::error::operation_aborted, 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item.elm->is_staged()); + + // Write successful + act = fix.fsm.resume(error_code(), item.req.payload().size(), cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::wait); + BOOST_TEST(item.elm->is_written()); + + // Logs + fix.check_log({ + {logger::level::info, "Writer task: 24 bytes written."}, + }); +} + +// A write error makes the writer exit +void test_write_error() +{ + // Setup + fixture fix; + test_elem item; + + // A request arrives before the writer starts + fix.mpx.add(item.elm); + + // Start. A write is triggered, and the request is marked as staged + auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item.elm->is_staged()); + + // The write completes with an error (possibly with partial success). + // The request is still staged, and the writer exits. + // Use an error we control so we can check logs + act = fix.fsm.resume(error::empty_field, 2u, cancellation_type_t::none); + BOOST_TEST_EQ(act, error_code(error::empty_field)); + BOOST_TEST(item.elm->is_staged()); + + // Logs + fix.check_log({ + {logger::level::info, "Writer task error: Expected field value is empty. [boost.redis:5]"}, + }); +} + +// A write is cancelled +void test_cancel_write() +{ + // Setup + fixture fix; + test_elem item; + + // A request arrives before the writer starts + fix.mpx.add(item.elm); + + // Start. A write is triggered + auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item.elm->is_staged()); + + // Write cancelled and failed with operation_aborted + act = fix.fsm.resume(asio::error::operation_aborted, 2u, cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + BOOST_TEST(item.elm->is_staged()); + + // Logs + fix.check_log({ + {logger::level::debug, "Writer task: cancelled (1)."}, + }); +} + +// A write is cancelled after completing but before the handler is dispatched +void test_cancel_write_edge() +{ + // Setup + fixture fix; + test_elem item; + + // A request arrives before the writer starts + fix.mpx.add(item.elm); + + // Start. A write is triggered + auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::write); + BOOST_TEST(item.elm->is_staged()); + + // Write cancelled but without error + act = fix.fsm.resume(error_code(), item.req.payload().size(), cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + BOOST_TEST(item.elm->is_written()); + + // Logs + fix.check_log({ + {logger::level::debug, "Writer task: cancelled (1)."}, + }); +} + +// The wait was cancelled because of per-operation cancellation (rather than a notification) +void test_cancel_wait() +{ + // Setup + fixture fix; + test_elem item; + + // Start. There is no request, so we wait + auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none); + BOOST_TEST_EQ(act, writer_action_type::wait); + + // Sanity check: the writer doesn't touch the multiplexer after a cancellation + fix.mpx.add(item.elm); + + // Cancel the wait, setting the cancellation state + act = fix.fsm.resume(asio::error::operation_aborted, 0u, asio::cancellation_type_t::terminal); + BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted)); + BOOST_TEST(item.elm->is_waiting()); + + // Logs + fix.check_log({ + {logger::level::debug, "Writer task: cancelled (2)."}, + }); +} + +} // namespace + +int main() +{ + test_single_request(); + test_request_arrives_while_writing(); + test_no_request_at_startup(); + + test_write_error(); + + test_cancel_write(); + test_cancel_write_edge(); + test_cancel_wait(); + + return boost::report_errors(); +}