2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Moves logging into reader_fsm (#332)

* Removes logging all the reader actions, and logs specific messages inside the reader_fsm instead
* Adds constructors to reader actions
* Makes reader_fsm use connection_state
* Refactors reader_fsm tests
* Moves exec_fsm action printing to test code
This commit is contained in:
Anarthal (Rubén Pérez)
2025-10-15 17:36:54 +02:00
committed by GitHub
parent f683e368dd
commit da09787d29
7 changed files with 330 additions and 223 deletions

View File

@@ -12,17 +12,17 @@
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include <boost/assert.hpp>
#include "sansio_utils.hpp"
#include <cstddef>
#include <memory>
#include <ostream>
#include <utility>
#include "sansio_utils.hpp"
using namespace boost::redis;
namespace asio = boost::asio;
using detail::exec_fsm;
@@ -33,11 +33,24 @@ using detail::exec_action;
using boost::system::error_code;
using boost::asio::cancellation_type_t;
#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \
case exec_action_type::elem: return "exec_action_type::" #elem
static auto to_string(exec_action_type t) noexcept -> char const*
{
switch (t) {
BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation);
BOOST_REDIS_EXEC_SWITCH_CASE(immediate);
BOOST_REDIS_EXEC_SWITCH_CASE(done);
BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer);
BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response);
default: return "exec_action_type::<invalid type>";
}
}
// Operators
namespace boost::redis::detail {
extern auto to_string(exec_action_type t) noexcept -> char const*;
std::ostream& operator<<(std::ostream& os, exec_action_type type)
{
os << to_string(type);

View File

@@ -5,13 +5,19 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include "sansio_utils.hpp"
#include <string_view>
namespace net = boost::asio;
@@ -23,17 +29,38 @@ using redis::detail::multiplexer;
using redis::generic_response;
using redis::any_adapter;
using redis::config;
using redis::detail::connection_state;
using action = redis::detail::reader_fsm::action;
using redis::logger;
// Operators
static const char* to_string(action::type type)
{
switch (type) {
case action::type::read_some: return "action::type::read_some";
case action::type::notify_push_receiver: return "action::type::notify_push_receiver";
case action::type::done: return "action::type::done";
default: return "<unknown action::type>";
}
}
namespace boost::redis::detail {
extern auto to_string(reader_fsm::action::type t) noexcept -> char const*;
std::ostream& operator<<(std::ostream& os, action::type type) { return os << to_string(type); }
std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t)
bool operator==(const action& lhs, const action& rhs) noexcept
{
os << to_string(t);
return os;
return lhs.type_ == rhs.type_ && lhs.push_size_ == rhs.push_size_ && lhs.ec_ == rhs.ec_;
}
std::ostream& operator<<(std::ostream& os, const action& act)
{
os << "action{ .type=" << act.type_;
if (act.type_ == action::type::done)
os << ", .error=" << act.ec_;
else if (act.type_ == action::type::notify_push_receiver)
os << ", .push_size=" << act.push_size_;
return os << " }";
}
} // namespace boost::redis::detail
@@ -53,17 +80,20 @@ void copy_to(multiplexer& mpx, std::string_view data)
std::copy(data.cbegin(), data.cend(), buffer.begin());
}
struct fixture : redis::detail::log_fixture {
connection_state st{make_logger()};
generic_response resp;
fixture() { st.mpx.set_receive_adapter(any_adapter{resp}); }
};
void test_push()
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
@@ -72,43 +102,39 @@ void test_push()
">1\r\n+msg2 \r\n"
">1\r\n+msg3 \r\n";
copy_to(mpx, payload);
copy_to(fix.st.mpx, payload);
// Deliver the 1st push
act = fsm.resume(payload.size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, 11u);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(11u));
// Deliver the 2st push
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, 12u);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(12u));
// Deliver the 3rd push
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, 13u);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(13u));
// All pushes were delivered so the fsm should demand more data
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 36 bytes read"},
{logger::level::debug, "Reader task: issuing read" },
});
}
void test_read_needs_more()
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// Split the incoming message in three random parts and deliver
@@ -116,166 +142,209 @@ void test_read_needs_more()
std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"};
// Passes the first part to the fsm.
copy_to(mpx, msg[0]);
act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
copy_to(fix.st.mpx, msg[0]);
act = fsm.resume(fix.st, msg[0].size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// Passes the second part to the fsm.
copy_to(mpx, msg[1]);
act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
copy_to(fix.st.mpx, msg[1]);
act = fsm.resume(fix.st, msg[1].size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// Passes the third and last part to the fsm, next it should ask us
// to deliver the message.
copy_to(mpx, msg[2]);
act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size());
BOOST_TEST_EQ(act.ec_, error_code());
copy_to(fix.st.mpx, msg[2]);
act = fsm.resume(fix.st, msg[2].size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(msg[0].size() + msg[1].size() + msg[2].size()));
// All pushes were delivered so the fsm should demand more data
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 3 bytes read" },
{logger::level::debug, "Reader task: incomplete message received"},
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read" },
{logger::level::debug, "Reader task: incomplete message received"},
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read" },
{logger::level::debug, "Reader task: issuing read" },
});
}
void test_read_error()
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
copy_to(mpx, payload);
copy_to(fix.st.mpx, payload);
// Deliver the data
act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted});
act = fsm.resume(fix.st, payload.size(), {redis::error::empty_field}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::empty_field});
// Check logging
fix.check_log({
// clang-format off
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read, error: Expected field value is empty. [boost.redis:5]"},
// clang-format on
});
}
void test_parse_error()
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// The fsm is asking for data.
std::string const payload = ">a\r\n";
copy_to(mpx, payload);
copy_to(fix.st.mpx, payload);
// Deliver the data
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number});
act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::not_a_number});
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read"},
{logger::level::debug, "Reader task: 4 bytes read"},
{logger::level::debug,
"Reader task: error processing message: Can't convert string to number (maybe forgot to "
"upgrade to RESP3?). [boost.redis:2]" },
});
}
void test_push_deliver_error()
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
copy_to(mpx, payload);
copy_to(fix.st.mpx, payload);
// Deliver the data
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume(fix.st, payload.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(11u));
// Resumes from notifying a push with an error.
act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted});
act = fsm.resume(fix.st, 0, redis::error::empty_field, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::empty_field});
// Check logging
fix.check_log({
// clang-format off
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read" },
{logger::level::debug, "Reader task: error notifying push receiver: Expected field value is empty. [boost.redis:5]"},
// clang-format on
});
}
void test_max_read_buffer_size()
{
config cfg;
cfg.read_buffer_append_size = 5;
cfg.max_read_size = 7;
multiplexer mpx;
mpx.set_config(cfg);
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
fix.st.cfg.read_buffer_append_size = 5;
fix.st.cfg.max_read_size = 7;
fix.st.mpx.set_config(fix.st.cfg);
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// Passes the first part to the fsm.
std::string const part1 = ">3\r\n";
copy_to(mpx, part1);
act = fsm.resume(part1.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size);
copy_to(fix.st.mpx, part1);
act = fsm.resume(fix.st, part1.size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(redis::error::exceeds_maximum_read_buffer_size));
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 4 bytes read" },
{logger::level::debug, "Reader task: incomplete message received"},
{logger::level::debug,
"Reader task: error in prepare_read: Reading data from the socket would exceed the maximum "
"size allowed of the read buffer. [boost.redis:26]" },
});
}
// Cancellations
void test_cancel_after_read()
void test_cancel_read()
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// The read was cancelled (maybe after delivering some bytes)
constexpr std::string_view payload = ">1\r\n";
copy_to(fix.st.mpx, payload);
act = fsm.resume(
fix.st,
payload.size(),
net::error::operation_aborted,
cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(net::error::operation_aborted));
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: cancelled (1)"},
});
}
void test_cancel_read_edge()
{
fixture fix;
reader_fsm fsm;
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
// Deliver a push, and notify a cancellation.
// This can happen if the cancellation signal arrives before the read handler runs
constexpr std::string_view payload = ">1\r\n+msg1\r\n";
copy_to(mpx, payload);
act = fsm.resume(payload.size(), ec, cancellation_type_t::terminal);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.push_size_, 0u);
BOOST_TEST_EQ(act.ec_, net::error::operation_aborted);
copy_to(fix.st.mpx, payload);
act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(net::error::operation_aborted));
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: cancelled (1)"},
});
}
void test_cancel_after_push_delivery()
void test_cancel_push_delivery()
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{mpx};
error_code ec;
action act;
fixture fix;
reader_fsm fsm;
// Initiate
act = fsm.resume(0, ec, cancellation_type_t::none);
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
@@ -283,20 +352,55 @@ void test_cancel_after_push_delivery()
">1\r\n+msg1\r\n"
">1\r\n+msg2 \r\n";
copy_to(mpx, payload);
copy_to(fix.st.mpx, payload);
// Deliver the 1st push
act = fsm.resume(payload.size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, 11u);
BOOST_TEST_EQ(act.ec_, error_code());
act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(11u));
// We got a cancellation while delivering it
act = fsm.resume(fix.st, 0, net::error::operation_aborted, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(net::error::operation_aborted));
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 23 bytes read"},
{logger::level::debug, "Reader task: cancelled (2)"},
});
}
void test_cancel_push_delivery_edge()
{
fixture fix;
reader_fsm fsm;
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
constexpr std::string_view payload =
">1\r\n+msg1\r\n"
">1\r\n+msg2 \r\n";
copy_to(fix.st.mpx, payload);
// Deliver the 1st push
act = fsm.resume(fix.st, payload.size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(11u));
// We got a cancellation after delivering it.
// This can happen if the cancellation signal arrives before the channel send handler runs
act = fsm.resume(0, ec, cancellation_type_t::terminal);
BOOST_TEST_EQ(act.type_, action::type::done);
BOOST_TEST_EQ(act.push_size_, 0u);
BOOST_TEST_EQ(act.ec_, net::error::operation_aborted);
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(net::error::operation_aborted));
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 23 bytes read"},
{logger::level::debug, "Reader task: cancelled (2)"},
});
}
} // namespace
@@ -311,8 +415,10 @@ int main()
test_push_deliver_error();
test_max_read_buffer_size();
test_cancel_after_read();
test_cancel_after_push_delivery();
test_cancel_read();
test_cancel_read_edge();
test_cancel_push_delivery();
test_cancel_push_delivery_edge();
return boost::report_errors();
}