// // Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include #include #include #include #include #include #include "common.hpp" namespace net = boost::asio; namespace redis = boost::redis; using boost::system::error_code; using net::cancellation_type_t; using redis::detail::reader_fsm; using redis::detail::multiplexer; using redis::detail::read_buffer; using redis::generic_response; using redis::any_adapter; using action = redis::detail::reader_fsm::action; namespace boost::redis::detail { extern auto to_string(reader_fsm::action::type t) noexcept -> char const*; std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t) { os << to_string(t); return os; } } // namespace boost::redis::detail // Operators namespace { void test_push() { read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n" ">1\r\n+msg2 \r\n" ">1\r\n+msg3 \r\n"; append_read_data(rbuf, payload); // Deliver the 1st push act = fsm.resume(payload.size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, 11u); BOOST_TEST_EQ(act.ec_, error_code()); // Deliver the 2st push act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, 12u); BOOST_TEST_EQ(act.ec_, error_code()); // Deliver the 3rd push act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, 13u); BOOST_TEST_EQ(act.ec_, error_code()); // All pushes were delivered so the fsm should demand more data act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); BOOST_TEST_EQ(act.ec_, error_code()); } void test_read_needs_more() { read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); // Split the incoming message in three random parts and deliver // them to the reader individually. std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"}; // Passes the first part to the fsm. append_read_data(rbuf, msg[0]); act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the second part to the fsm. append_read_data(rbuf, msg[1]); act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. append_read_data(rbuf, msg[2]); act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size()); BOOST_TEST_EQ(act.ec_, error_code()); // All pushes were delivered so the fsm should demand more data act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); BOOST_TEST_EQ(act.ec_, error_code()); } void test_read_error() { read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; append_read_data(rbuf, payload); // Deliver the data act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code()); // Finish act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } void test_parse_error() { read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. std::string const payload = ">a\r\n"; append_read_data(rbuf, payload); // Deliver the data act = fsm.resume(payload.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code()); // Finish act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number}); } void test_push_deliver_error() { read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; append_read_data(rbuf, payload); // Deliver the data act = fsm.resume(payload.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.ec_, error_code()); // Resumes from notifying a push with an error. act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); // Finish act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } void test_max_read_buffer_size() { read_buffer rbuf; rbuf.set_config({5, 7}); multiplexer mpx; generic_response resp; mpx.set_receive_adapter(any_adapter{resp}); reader_fsm fsm{rbuf, mpx}; error_code ec; action act; // Initiate act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::append_some); // Passes the first part to the fsm. std::string const part1 = ">3\r\n"; append_read_data(rbuf, part1); act = fsm.resume(part1.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code()); act = fsm.resume({}, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); } } // namespace int main() { test_max_read_buffer_size(); test_push_deliver_error(); test_read_needs_more(); test_push(); test_read_error(); test_parse_error(); return boost::report_errors(); }