mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Implements the writer as an FSM and adds tests (#325)
* Refactors the writer task into a FSM and adds unit tests. * Adds a testing utility to check logging. Entails no functional change (other than cosmetic word fixes to the logs).
This commit is contained in:
committed by
GitHub
parent
d3e335942f
commit
228b31917c
@@ -40,6 +40,7 @@ make_test(test_any_adapter)
|
||||
make_test(test_exec_fsm)
|
||||
make_test(test_log_to_file)
|
||||
make_test(test_conn_logging)
|
||||
make_test(test_writer_fsm)
|
||||
make_test(test_reader_fsm)
|
||||
make_test(test_connect_fsm)
|
||||
make_test(test_setup_request_utils)
|
||||
|
||||
@@ -57,6 +57,7 @@ local tests =
|
||||
test_exec_fsm
|
||||
test_log_to_file
|
||||
test_conn_logging
|
||||
test_writer_fsm
|
||||
test_reader_fsm
|
||||
test_connect_fsm
|
||||
test_setup_request_utils
|
||||
|
||||
@@ -4,10 +4,34 @@
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
|
||||
#include "sansio_utils.hpp"
|
||||
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
#include <initializer_list>
|
||||
#include <iostream>
|
||||
#include <ostream>
|
||||
|
||||
using namespace boost::redis;
|
||||
|
||||
static constexpr const char* to_string(logger::level lvl)
|
||||
{
|
||||
switch (lvl) {
|
||||
case logger::level::disabled: return "logger::level::disabled";
|
||||
case logger::level::emerg: return "logger::level::emerg";
|
||||
case logger::level::alert: return "logger::level::alert";
|
||||
case logger::level::crit: return "logger::level::crit";
|
||||
case logger::level::err: return "logger::level::err";
|
||||
case logger::level::warning: return "logger::level::warning";
|
||||
case logger::level::notice: return "logger::level::notice";
|
||||
case logger::level::info: return "logger::level::info";
|
||||
case logger::level::debug: return "logger::level::debug";
|
||||
default: return "<unknown logger::level>";
|
||||
}
|
||||
}
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
@@ -22,4 +46,29 @@ void read(multiplexer& mpx, std::string_view data)
|
||||
mpx.commit_read(data.size());
|
||||
}
|
||||
|
||||
// Operators to enable checking logs
|
||||
bool operator==(const log_message& lhs, const log_message& rhs) noexcept
|
||||
{
|
||||
return lhs.lvl == rhs.lvl && lhs.msg == rhs.msg;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const log_message& v)
|
||||
{
|
||||
return os << "log_message { .lvl=" << to_string(v.lvl) << ", .msg=" << v.msg << " }";
|
||||
}
|
||||
|
||||
log_fixture::log_fixture()
|
||||
: lgr{logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) {
|
||||
msgs.push_back({lvl, std::string(msg)});
|
||||
})}
|
||||
{ }
|
||||
|
||||
void log_fixture::check_log(std::initializer_list<const log_message> expected, source_location loc)
|
||||
const
|
||||
{
|
||||
if (!BOOST_TEST_ALL_EQ(expected.begin(), expected.end(), msgs.begin(), msgs.end())) {
|
||||
std::cerr << "Called from " << loc << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
|
||||
@@ -7,6 +7,13 @@
|
||||
#ifndef BOOST_REDIS_TEST_SANSIO_UTILS_HPP
|
||||
#define BOOST_REDIS_TEST_SANSIO_UTILS_HPP
|
||||
|
||||
#include <boost/redis/detail/connection_logger.hpp>
|
||||
#include <boost/redis/logger.hpp>
|
||||
|
||||
#include <boost/assert/source_location.hpp>
|
||||
|
||||
#include <initializer_list>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
namespace boost::redis::detail {
|
||||
@@ -23,6 +30,22 @@ class multiplexer;
|
||||
// This is used in the multiplexer tests.
|
||||
void read(multiplexer& mpx, std::string_view data);
|
||||
|
||||
// Utilities for checking logs
|
||||
struct log_message {
|
||||
logger::level lvl;
|
||||
std::string msg;
|
||||
};
|
||||
|
||||
struct log_fixture {
|
||||
std::vector<log_message> msgs;
|
||||
detail::connection_logger lgr;
|
||||
|
||||
log_fixture();
|
||||
void check_log(
|
||||
std::initializer_list<const log_message> expected,
|
||||
source_location loc = BOOST_CURRENT_LOCATION) const;
|
||||
};
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
|
||||
#endif // BOOST_REDIS_TEST_SANSIO_UTILS_HPP
|
||||
#endif // BOOST_REDIS_TEST_SANSIO_UTILS_HPP
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
|
||||
#include "sansio_utils.hpp"
|
||||
|
||||
#include <iterator>
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
@@ -61,28 +63,6 @@ static const char* to_string(transport_type type)
|
||||
}
|
||||
}
|
||||
|
||||
static const char* to_string(logger::level lvl)
|
||||
{
|
||||
switch (lvl) {
|
||||
case logger::level::disabled: return "logger::level::disabled";
|
||||
case logger::level::emerg: return "logger::level::emerg";
|
||||
case logger::level::alert: return "logger::level::alert";
|
||||
case logger::level::crit: return "logger::level::crit";
|
||||
case logger::level::err: return "logger::level::err";
|
||||
case logger::level::warning: return "logger::level::warning";
|
||||
case logger::level::notice: return "logger::level::notice";
|
||||
case logger::level::info: return "logger::level::info";
|
||||
case logger::level::debug: return "logger::level::debug";
|
||||
default: return "<unknown logger::level>";
|
||||
}
|
||||
}
|
||||
|
||||
namespace boost::redis {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, logger::level lvl) { return os << to_string(lvl); }
|
||||
|
||||
} // namespace boost::redis
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, connect_action_type type)
|
||||
@@ -122,33 +102,15 @@ auto resolver_data = [] {
|
||||
"1234");
|
||||
}();
|
||||
|
||||
// For checking logs
|
||||
struct log_message {
|
||||
logger::level lvl;
|
||||
std::string msg;
|
||||
|
||||
friend bool operator==(const log_message& lhs, const log_message& rhs) noexcept
|
||||
{
|
||||
return lhs.lvl == rhs.lvl && lhs.msg == rhs.msg;
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const log_message& v)
|
||||
{
|
||||
return os << "log_message { .lvl=" << v.lvl << ", .msg=" << v.msg << " }";
|
||||
}
|
||||
};
|
||||
|
||||
// Reduce duplication
|
||||
struct fixture {
|
||||
struct fixture : detail::log_fixture {
|
||||
config cfg;
|
||||
std::ostringstream oss{};
|
||||
std::vector<log_message> msgs{};
|
||||
detail::connection_logger lgr{
|
||||
logger(logger::level::debug, [&](logger::level lvl, std::string_view msg) {
|
||||
msgs.push_back({lvl, std::string(msg)});
|
||||
})};
|
||||
connect_fsm fsm{cfg, lgr};
|
||||
redis_stream_state st{};
|
||||
|
||||
fixture(config&& cfg = {})
|
||||
: cfg{std::move(cfg)}
|
||||
{ }
|
||||
};
|
||||
|
||||
config make_ssl_config()
|
||||
@@ -183,11 +145,10 @@ void test_tcp_success()
|
||||
BOOST_TEST_NOT(fix.st.ssl_stream_used);
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
|
||||
{logger::level::info, "Connected to 192.168.10.1:1234" },
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_tcp_tls_success()
|
||||
@@ -210,12 +171,11 @@ void test_tcp_tls_success()
|
||||
BOOST_TEST(fix.st.ssl_stream_used);
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
|
||||
{logger::level::info, "Connected to 192.168.10.1:1234" },
|
||||
{logger::level::info, "Successfully performed SSL handshake" },
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_tcp_tls_success_reconnect()
|
||||
@@ -241,12 +201,11 @@ void test_tcp_tls_success_reconnect()
|
||||
BOOST_TEST(fix.st.ssl_stream_used);
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
|
||||
{logger::level::info, "Connected to 192.168.10.1:1234" },
|
||||
{logger::level::info, "Successfully performed SSL handshake" },
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_unix_success()
|
||||
@@ -267,10 +226,9 @@ void test_unix_success()
|
||||
BOOST_TEST_NOT(fix.st.ssl_stream_used);
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
{logger::level::info, "Connected to /run/redis.sock"},
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
// Close errors are ignored
|
||||
@@ -292,10 +250,9 @@ void test_unix_success_close_error()
|
||||
BOOST_TEST_NOT(fix.st.ssl_stream_used);
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
{logger::level::info, "Connected to /run/redis.sock"},
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
// Resolve errors
|
||||
@@ -311,12 +268,11 @@ void test_tcp_resolve_error()
|
||||
BOOST_TEST_EQ(act, error_code(error::empty_field));
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Error resolving the server hostname: Expected field value is empty. [boost.redis:5]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_tcp_resolve_timeout()
|
||||
@@ -335,12 +291,11 @@ void test_tcp_resolve_timeout()
|
||||
BOOST_TEST_EQ(act, error_code(error::resolve_timeout));
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Error resolving the server hostname: Resolve timeout. [boost.redis:17]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_tcp_resolve_cancel()
|
||||
@@ -392,13 +347,12 @@ void test_tcp_connect_error()
|
||||
BOOST_TEST_EQ(act, error_code(error::empty_field));
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
|
||||
{logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_tcp_connect_timeout()
|
||||
@@ -419,13 +373,12 @@ void test_tcp_connect_timeout()
|
||||
BOOST_TEST_EQ(act, error_code(error::connect_timeout));
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
|
||||
{logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_tcp_connect_cancel()
|
||||
@@ -486,14 +439,13 @@ void test_ssl_handshake_error()
|
||||
BOOST_TEST(fix.st.ssl_stream_used);
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
|
||||
{logger::level::info, "Connected to 192.168.10.1:1234" },
|
||||
{logger::level::info, "Failed to perform SSL handshake: Expected field value is empty. [boost.redis:5]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_ssl_handshake_timeout()
|
||||
@@ -515,14 +467,13 @@ void test_ssl_handshake_timeout()
|
||||
BOOST_TEST(fix.st.ssl_stream_used);
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Resolve results: 192.168.10.1:1234, 192.168.10.2:1235"},
|
||||
{logger::level::info, "Connected to 192.168.10.1:1234" },
|
||||
{logger::level::info, "Failed to perform SSL handshake: SSL handshake timeout. [boost.redis:20]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_ssl_handshake_cancel()
|
||||
@@ -584,12 +535,11 @@ void test_unix_connect_error()
|
||||
BOOST_TEST_EQ(act, error_code(error::empty_field));
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Failed to connect to the server: Expected field value is empty. [boost.redis:5]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_unix_connect_timeout()
|
||||
@@ -606,12 +556,11 @@ void test_unix_connect_timeout()
|
||||
BOOST_TEST_EQ(act, error_code(error::connect_timeout));
|
||||
|
||||
// Check logging
|
||||
const log_message expected[] = {
|
||||
fix.check_log({
|
||||
// clang-format off
|
||||
{logger::level::info, "Failed to connect to the server: Connect timeout. [boost.redis:18]"},
|
||||
// clang-format on
|
||||
};
|
||||
BOOST_TEST_ALL_EQ(std::begin(expected), std::end(expected), fix.msgs.begin(), fix.msgs.end());
|
||||
});
|
||||
}
|
||||
|
||||
void test_unix_connect_cancel()
|
||||
|
||||
323
test/test_writer_fsm.cpp
Normal file
323
test/test_writer_fsm.cpp
Normal file
@@ -0,0 +1,323 @@
|
||||
//
|
||||
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
|
||||
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
|
||||
#include <boost/redis/detail/connection_logger.hpp>
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
#include <boost/redis/detail/writer_fsm.hpp>
|
||||
#include <boost/redis/logger.hpp>
|
||||
#include <boost/redis/request.hpp>
|
||||
|
||||
#include <boost/asio/cancellation_type.hpp>
|
||||
#include <boost/asio/error.hpp>
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include "sansio_utils.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <ostream>
|
||||
|
||||
using namespace boost::redis;
|
||||
namespace asio = boost::asio;
|
||||
using detail::writer_fsm;
|
||||
using detail::multiplexer;
|
||||
using detail::writer_action_type;
|
||||
using detail::consume_result;
|
||||
using detail::writer_action;
|
||||
using boost::system::error_code;
|
||||
using boost::asio::cancellation_type_t;
|
||||
using detail::connection_logger;
|
||||
|
||||
// Operators
|
||||
static const char* to_string(writer_action_type value)
|
||||
{
|
||||
switch (value) {
|
||||
case writer_action_type::done: return "writer_action_type::done";
|
||||
case writer_action_type::write: return "writer_action_type::write";
|
||||
case writer_action_type::wait: return "writer_action_type::wait";
|
||||
default: return "<unknown writer_action_type>";
|
||||
}
|
||||
}
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, writer_action_type type)
|
||||
{
|
||||
os << to_string(type);
|
||||
return os;
|
||||
}
|
||||
|
||||
bool operator==(const writer_action& lhs, const writer_action& rhs) noexcept
|
||||
{
|
||||
return lhs.type == rhs.type && lhs.ec == rhs.ec;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const writer_action& act)
|
||||
{
|
||||
os << "writer_action{ .type=" << act.type;
|
||||
if (act.type == writer_action_type::done)
|
||||
os << ", .error=" << act.ec;
|
||||
return os << " }";
|
||||
}
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
|
||||
namespace {
|
||||
|
||||
// A helper to create a request and its associated elem
|
||||
struct test_elem {
|
||||
request req;
|
||||
bool done{false};
|
||||
std::shared_ptr<multiplexer::elem> elm;
|
||||
|
||||
test_elem()
|
||||
{
|
||||
// Empty requests are not valid. The request needs to be populated before creating the element
|
||||
req.push("get", "mykey");
|
||||
elm = std::make_shared<multiplexer::elem>(req, any_adapter{});
|
||||
|
||||
elm->set_done_callback([this] {
|
||||
done = true;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
struct fixture : detail::log_fixture {
|
||||
multiplexer mpx;
|
||||
writer_fsm fsm{mpx, lgr};
|
||||
};
|
||||
|
||||
// A single request is written, then we wait and repeat
|
||||
void test_single_request()
|
||||
{
|
||||
// Setup
|
||||
fixture fix;
|
||||
test_elem item1, item2;
|
||||
|
||||
// A request arrives before the writer starts
|
||||
fix.mpx.add(item1.elm);
|
||||
|
||||
// Start. A write is triggered, and the request is marked as staged
|
||||
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item1.elm->is_staged());
|
||||
|
||||
// The write completes successfully. The request is written, and we go back to sleep.
|
||||
act = fix.fsm.resume(error_code(), item1.req.payload().size(), cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::wait);
|
||||
BOOST_TEST(item1.elm->is_written());
|
||||
|
||||
// Another request arrives
|
||||
fix.mpx.add(item2.elm);
|
||||
|
||||
// The wait is cancelled to signal we've got a new request
|
||||
act = fix.fsm.resume(asio::error::operation_aborted, 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item2.elm->is_staged());
|
||||
|
||||
// Write successful
|
||||
act = fix.fsm.resume(error_code(), item2.req.payload().size(), cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::wait);
|
||||
BOOST_TEST(item2.elm->is_written());
|
||||
|
||||
// Logs
|
||||
fix.check_log({
|
||||
{logger::level::info, "Writer task: 24 bytes written."},
|
||||
{logger::level::info, "Writer task: 24 bytes written."},
|
||||
});
|
||||
}
|
||||
|
||||
// If a request arrives while we're performing a write, we don't get back to sleep
|
||||
void test_request_arrives_while_writing()
|
||||
{
|
||||
// Setup
|
||||
fixture fix;
|
||||
test_elem item1, item2;
|
||||
|
||||
// A request arrives before the writer starts
|
||||
fix.mpx.add(item1.elm);
|
||||
|
||||
// Start. A write is triggered, and the request is marked as staged
|
||||
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item1.elm->is_staged());
|
||||
|
||||
// While the write is outstanding, a new request arrives
|
||||
fix.mpx.add(item2.elm);
|
||||
|
||||
// The write completes successfully. The request is written,
|
||||
// and we start writing the new one
|
||||
act = fix.fsm.resume(error_code(), item1.req.payload().size(), cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item1.elm->is_written());
|
||||
BOOST_TEST(item2.elm->is_staged());
|
||||
|
||||
// Write successful
|
||||
act = fix.fsm.resume(error_code(), item2.req.payload().size(), cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::wait);
|
||||
BOOST_TEST(item2.elm->is_written());
|
||||
|
||||
// Logs
|
||||
fix.check_log({
|
||||
{logger::level::info, "Writer task: 24 bytes written."},
|
||||
{logger::level::info, "Writer task: 24 bytes written."},
|
||||
});
|
||||
}
|
||||
|
||||
// If there is no request when the writer starts, we wait for it
|
||||
void test_no_request_at_startup()
|
||||
{
|
||||
// Setup
|
||||
fixture fix;
|
||||
test_elem item;
|
||||
|
||||
// Start. There is no request, so we wait
|
||||
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::wait);
|
||||
|
||||
// A request arrives
|
||||
fix.mpx.add(item.elm);
|
||||
|
||||
// The wait is cancelled to signal we've got a new request
|
||||
act = fix.fsm.resume(asio::error::operation_aborted, 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item.elm->is_staged());
|
||||
|
||||
// Write successful
|
||||
act = fix.fsm.resume(error_code(), item.req.payload().size(), cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::wait);
|
||||
BOOST_TEST(item.elm->is_written());
|
||||
|
||||
// Logs
|
||||
fix.check_log({
|
||||
{logger::level::info, "Writer task: 24 bytes written."},
|
||||
});
|
||||
}
|
||||
|
||||
// A write error makes the writer exit
|
||||
void test_write_error()
|
||||
{
|
||||
// Setup
|
||||
fixture fix;
|
||||
test_elem item;
|
||||
|
||||
// A request arrives before the writer starts
|
||||
fix.mpx.add(item.elm);
|
||||
|
||||
// Start. A write is triggered, and the request is marked as staged
|
||||
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item.elm->is_staged());
|
||||
|
||||
// The write completes with an error (possibly with partial success).
|
||||
// The request is still staged, and the writer exits.
|
||||
// Use an error we control so we can check logs
|
||||
act = fix.fsm.resume(error::empty_field, 2u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, error_code(error::empty_field));
|
||||
BOOST_TEST(item.elm->is_staged());
|
||||
|
||||
// Logs
|
||||
fix.check_log({
|
||||
{logger::level::info, "Writer task error: Expected field value is empty. [boost.redis:5]"},
|
||||
});
|
||||
}
|
||||
|
||||
// A write is cancelled
|
||||
void test_cancel_write()
|
||||
{
|
||||
// Setup
|
||||
fixture fix;
|
||||
test_elem item;
|
||||
|
||||
// A request arrives before the writer starts
|
||||
fix.mpx.add(item.elm);
|
||||
|
||||
// Start. A write is triggered
|
||||
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item.elm->is_staged());
|
||||
|
||||
// Write cancelled and failed with operation_aborted
|
||||
act = fix.fsm.resume(asio::error::operation_aborted, 2u, cancellation_type_t::terminal);
|
||||
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
|
||||
BOOST_TEST(item.elm->is_staged());
|
||||
|
||||
// Logs
|
||||
fix.check_log({
|
||||
{logger::level::debug, "Writer task: cancelled (1)."},
|
||||
});
|
||||
}
|
||||
|
||||
// A write is cancelled after completing but before the handler is dispatched
|
||||
void test_cancel_write_edge()
|
||||
{
|
||||
// Setup
|
||||
fixture fix;
|
||||
test_elem item;
|
||||
|
||||
// A request arrives before the writer starts
|
||||
fix.mpx.add(item.elm);
|
||||
|
||||
// Start. A write is triggered
|
||||
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::write);
|
||||
BOOST_TEST(item.elm->is_staged());
|
||||
|
||||
// Write cancelled but without error
|
||||
act = fix.fsm.resume(error_code(), item.req.payload().size(), cancellation_type_t::terminal);
|
||||
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
|
||||
BOOST_TEST(item.elm->is_written());
|
||||
|
||||
// Logs
|
||||
fix.check_log({
|
||||
{logger::level::debug, "Writer task: cancelled (1)."},
|
||||
});
|
||||
}
|
||||
|
||||
// The wait was cancelled because of per-operation cancellation (rather than a notification)
|
||||
void test_cancel_wait()
|
||||
{
|
||||
// Setup
|
||||
fixture fix;
|
||||
test_elem item;
|
||||
|
||||
// Start. There is no request, so we wait
|
||||
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act, writer_action_type::wait);
|
||||
|
||||
// Sanity check: the writer doesn't touch the multiplexer after a cancellation
|
||||
fix.mpx.add(item.elm);
|
||||
|
||||
// Cancel the wait, setting the cancellation state
|
||||
act = fix.fsm.resume(asio::error::operation_aborted, 0u, asio::cancellation_type_t::terminal);
|
||||
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
|
||||
BOOST_TEST(item.elm->is_waiting());
|
||||
|
||||
// Logs
|
||||
fix.check_log({
|
||||
{logger::level::debug, "Writer task: cancelled (2)."},
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int main()
|
||||
{
|
||||
test_single_request();
|
||||
test_request_arrives_while_writing();
|
||||
test_no_request_at_startup();
|
||||
|
||||
test_write_error();
|
||||
|
||||
test_cancel_write();
|
||||
test_cancel_write_edge();
|
||||
test_cancel_wait();
|
||||
|
||||
return boost::report_errors();
|
||||
}
|
||||
Reference in New Issue
Block a user