2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Makes health checks flexible so they don't tear down connections under heavy load (#328)

Adds error::write_timeout

close #104
This commit is contained in:
Anarthal (Rubén Pérez)
2025-10-20 15:29:20 +02:00
committed by GitHub
parent da09787d29
commit 2b09ecbd78
26 changed files with 863 additions and 399 deletions

View File

@@ -11,6 +11,7 @@
#include <boost/assert/source_location.hpp>
#include <chrono>
#include <initializer_list>
#include <string>
#include <string_view>
@@ -44,6 +45,11 @@ struct log_fixture {
logger make_logger();
};
constexpr auto to_milliseconds(std::chrono::steady_clock::duration d)
{
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count();
}
} // namespace boost::redis::detail
#endif // BOOST_REDIS_TEST_SANSIO_UTILS_HPP

View File

@@ -6,14 +6,19 @@
#include <boost/redis/connection.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/core/lightweight_test.hpp>
#include "common.hpp"
#include <chrono>
#include <cstddef>
#include <string>
namespace net = boost::asio;
namespace redis = boost::redis;
@@ -21,6 +26,7 @@ using error_code = boost::system::error_code;
using connection = boost::redis::connection;
using boost::redis::request;
using boost::redis::ignore;
using boost::redis::generic_response;
using namespace std::chrono_literals;
namespace {
@@ -154,6 +160,98 @@ void test_disabled()
BOOST_TEST(exec2_finished);
}
// Receiving data is sufficient to consider our connection healthy.
// Sends a blocking request that causes PINGs to not be answered,
// and subscribes to a channel to receive pushes periodically.
// This simulates situations of heavy load, where PINGs may not be answered on time.
class test_flexible {
net::io_context ioc;
connection conn1{ioc}; // The one that simulates a heavy load condition
connection conn2{ioc}; // Publishes messages
net::steady_timer timer{ioc};
request publish_req;
bool run1_finished = false, run2_finished = false, exec_finished{false},
publisher_finished{false};
// Starts publishing messages to the channel
void start_publish()
{
conn2.async_exec(publish_req, ignore, [this](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
if (exec_finished) {
// The blocking request finished, we're done
conn2.cancel();
publisher_finished = true;
} else {
// Wait for some time and publish again
timer.expires_after(100ms);
timer.async_wait([this](error_code ec) {
BOOST_TEST_EQ(ec, error_code());
start_publish();
});
}
});
}
// Generates a sufficiently unique name for channels so
// tests may be run in parallel for different configurations
static std::string make_unique_id()
{
auto t = std::chrono::high_resolution_clock::now();
return "test-flexible-health-checks-" + std::to_string(t.time_since_epoch().count());
}
public:
test_flexible() = default;
void run()
{
// Setup
auto cfg = make_test_config();
cfg.health_check_interval = 500ms;
generic_response resp;
std::string channel_name = make_unique_id();
publish_req.push("PUBLISH", channel_name, "test_health_check_flexible");
// This request will block for much longer than the health check
// interval. If we weren't receiving pushes, the connection would be considered dead.
// If this request finishes successfully, the health checker is doing good
request blocking_req;
blocking_req.push("SUBSCRIBE", channel_name);
blocking_req.push("BLPOP", "any", 2);
blocking_req.get_config().cancel_if_unresponded = true;
blocking_req.get_config().cancel_on_connection_lost = true;
conn1.async_run(cfg, [&](error_code ec) {
run1_finished = true;
BOOST_TEST_EQ(ec, net::error::operation_aborted);
});
conn2.async_run(cfg, [&](error_code ec) {
run2_finished = true;
BOOST_TEST_EQ(ec, net::error::operation_aborted);
});
// BLPOP will return NIL, so we can't use ignore
conn1.async_exec(blocking_req, resp, [&](error_code ec, std::size_t) {
exec_finished = true;
BOOST_TEST_EQ(ec, error_code());
conn1.cancel();
});
start_publish();
ioc.run_for(test_timeout);
BOOST_TEST(run1_finished);
BOOST_TEST(run2_finished);
BOOST_TEST(exec_finished);
BOOST_TEST(publisher_finished);
}
};
} // namespace
int main()
@@ -161,6 +259,7 @@ int main()
test_reconnection();
test_error_code();
test_disabled();
test_flexible().run();
return boost::report_errors();
}

View File

@@ -95,7 +95,6 @@ BOOST_AUTO_TEST_CASE(echo_stress)
net::io_context ctx;
connection conn{ctx};
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds::zero();
// Number of coroutines that will send pings sharing the same
// connection to redis.

View File

@@ -134,8 +134,7 @@ BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170)
auto conn = std::make_shared<connection>(ioc);
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds(0);
conn->async_run(cfg, {}, net::detached);
conn->async_run(cfg, net::detached);
constexpr int repeat = 8000;
int remaining = repeat;

View File

@@ -138,7 +138,7 @@ void test_success()
// Simulate a successful write
BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// Simulate a successful read
read(mpx, "$5\r\nhello\r\n");
@@ -177,7 +177,7 @@ void test_parse_error()
// Simulate a successful write
BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// Simulate a read that will trigger an error.
// The second field should be a number (rather than the empty string).
@@ -239,7 +239,7 @@ void test_not_connected()
// Simulate a successful write
BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// Simulate a successful read
read(mpx, "$5\r\nhello\r\n");
@@ -329,7 +329,7 @@ void test_cancel_notwaiting_terminal_partial()
// The multiplexer starts writing the request
BOOST_TEST_EQ_MSG(mpx.prepare_write(), 1u, tc.name);
BOOST_TEST_EQ_MSG(mpx.commit_write(), 0u, tc.name);
BOOST_TEST_EQ_MSG(mpx.commit_write(mpx.get_write_buffer().size()), true, tc.name);
// A cancellation arrives
act = fsm.resume(true, tc.type);
@@ -368,7 +368,7 @@ void test_cancel_notwaiting_total()
// Simulate a successful write
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// We got requested a cancellation here, but we can't honor it
act = fsm.resume(true, asio::cancellation_type_t::total);

View File

@@ -529,6 +529,7 @@ BOOST_AUTO_TEST_CASE(cover_error)
check_error("boost.redis", boost::redis::error::incompatible_node_depth);
check_error("boost.redis", boost::redis::error::resp3_hello);
check_error("boost.redis", boost::redis::error::exceeds_maximum_read_buffer_size);
check_error("boost.redis", boost::redis::error::write_timeout);
}
std::string get_type_as_str(boost::redis::resp3::type t)

View File

@@ -15,14 +15,13 @@
#include <boost/assert/source_location.hpp>
#include <boost/core/lightweight_test.hpp>
#include "sansio_utils.hpp"
#include <iostream>
#include <memory>
#include <ostream>
#include <string>
#include <string_view>
#include "sansio_utils.hpp"
using boost::redis::request;
using boost::redis::detail::multiplexer;
using boost::redis::detail::consume_result;
@@ -118,7 +117,7 @@ void test_request_needs_more()
// Add the element to the multiplexer and simulate a successful write
mpx.add(item1.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(item1.req.payload().size()));
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(!item1.done);
@@ -161,6 +160,13 @@ void test_several_requests()
BOOST_TEST_EQ(mpx.prepare_write(), 3u);
BOOST_TEST_EQ(mpx.prepare_write(), 0u);
// The write buffer holds the 3 requests, coalesced
constexpr std::string_view expected_buffer =
"*2\r\n$4\r\nPING\r\n$7\r\ncmd-arg\r\n"
"*2\r\n$4\r\nPING\r\n$7\r\ncmd-arg\r\n"
"*2\r\n$9\r\nSUBSCRIBE\r\n$7\r\ncmd-arg\r\n";
BOOST_TEST_EQ(mpx.get_write_buffer(), expected_buffer);
// After coalescing the requests for writing their statuses should
// be changed to "staged".
BOOST_TEST(item1.elem_ptr->is_staged());
@@ -180,7 +186,7 @@ void test_several_requests()
// The commit_write call informs the multiplexer the payload was
// sent (e.g. written to the socket). This step releases requests
// that has no response.
BOOST_TEST_EQ(mpx.commit_write(), 1u);
BOOST_TEST(mpx.commit_write(expected_buffer.size()));
// The staged status should now have changed to written.
BOOST_TEST(item1.elem_ptr->is_written());
@@ -222,6 +228,61 @@ void test_several_requests()
BOOST_TEST(item3.done);
}
void test_short_writes()
{
// Setup
multiplexer mpx;
test_item item1{};
test_item item2{false};
// Add some requests to the multiplexer.
mpx.add(item1.elem_ptr);
mpx.add(item2.elem_ptr);
BOOST_TEST(item1.elem_ptr->is_waiting());
BOOST_TEST(item2.elem_ptr->is_waiting());
// Start writing them
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(
mpx.get_write_buffer(),
"*2\r\n$4\r\nPING\r\n$7\r\ncmd-arg\r\n"
"*2\r\n$9\r\nSUBSCRIBE\r\n$7\r\ncmd-arg\r\n");
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
// Write a small part. The write buffer gets updated, but request status is not changed
BOOST_TEST_NOT(mpx.commit_write(8u));
BOOST_TEST_EQ(
mpx.get_write_buffer(),
"PING\r\n$7\r\ncmd-arg\r\n"
"*2\r\n$9\r\nSUBSCRIBE\r\n$7\r\ncmd-arg\r\n");
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
// Write another part
BOOST_TEST_NOT(mpx.commit_write(19u));
BOOST_TEST_EQ(mpx.get_write_buffer(), "*2\r\n$9\r\nSUBSCRIBE\r\n$7\r\ncmd-arg\r\n");
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
// A zero-size write doesn't cause trouble
BOOST_TEST_NOT(mpx.commit_write(0u));
BOOST_TEST_EQ(mpx.get_write_buffer(), "*2\r\n$9\r\nSUBSCRIBE\r\n$7\r\ncmd-arg\r\n");
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
// Write everything except the last byte
BOOST_TEST_NOT(mpx.commit_write(31u));
BOOST_TEST_EQ(mpx.get_write_buffer(), "\n");
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
// Write the last byte
BOOST_TEST(mpx.commit_write(1u));
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(item2.elem_ptr->is_done());
}
// The response to a request is received before its write
// confirmation. This might happen on heavy load
void test_request_response_before_write()
@@ -251,7 +312,7 @@ void test_request_response_before_write()
item.reset();
// The write gets confirmed and causes no problem
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
}
void test_push()
@@ -395,7 +456,7 @@ void test_mix_responses_pushes()
mpx.add(item1.elem_ptr);
mpx.add(item2.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(!item1.done);
BOOST_TEST(item2.elem_ptr->is_written());
@@ -488,7 +549,7 @@ void test_cancel_waiting()
// We can progress the other request normally
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
error_code ec;
read(mpx, "$11\r\nHello world\r\n");
auto res = mpx.consume(ec);
@@ -518,7 +579,7 @@ void test_cancel_staged()
item1.reset(); // Verify we don't reference this item anyhow
// The write gets confirmed
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// The cancelled request's response arrives. It gets discarded
error_code ec;
@@ -556,7 +617,7 @@ void test_cancel_staged_command_without_response()
item1.reset(); // Verify we don't reference this item anyhow
// The write gets confirmed
BOOST_TEST_EQ(mpx.commit_write(), 1u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// The 2nd request's response arrives. It gets parsed successfully
error_code ec;
@@ -582,7 +643,7 @@ void test_cancel_written()
// A write succeeds
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// Cancel the first request
mpx.cancel(item1->elem_ptr);
@@ -623,7 +684,7 @@ void test_cancel_written_half_parsed_response()
// A write succeeds
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// Get the response for the 1st command in req1
error_code ec;
@@ -686,7 +747,7 @@ void test_cancel_written_null_error()
// A write succeeds
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
// Cancel the first request
mpx.cancel(item1->elem_ptr);
@@ -741,7 +802,7 @@ void test_cancel_on_connection_lost()
mpx.add(item_written1.elem_ptr);
mpx.add(item_written2.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
mpx.add(item_staged1.elem_ptr);
mpx.add(item_staged2.elem_ptr);
@@ -794,7 +855,7 @@ void test_cancel_on_connection_lost_abandoned()
mpx.add(item_written1->elem_ptr);
mpx.add(item_written2->elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
mpx.add(item_staged1->elem_ptr);
mpx.add(item_staged2->elem_ptr);
@@ -898,7 +959,7 @@ void test_reset()
// We're able to add write requests and read responses - all state was reset
mpx.add(item2.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(mpx.commit_write(mpx.get_write_buffer().size()));
std::string_view response_buffer = "$11\r\nHello world\r\n";
read(mpx, response_buffer);
@@ -919,6 +980,7 @@ int main()
test_request_needs_more();
test_several_requests();
test_request_response_before_write();
test_short_writes();
test_push();
test_push_needs_more();
test_push_heuristics_no_request();

View File

@@ -18,6 +18,7 @@
#include "sansio_utils.hpp"
#include <chrono>
#include <string_view>
namespace net = boost::asio;
@@ -32,6 +33,7 @@ using redis::config;
using redis::detail::connection_state;
using action = redis::detail::reader_fsm::action;
using redis::logger;
using namespace std::chrono_literals;
// Operators
static const char* to_string(action::type type)
@@ -50,16 +52,29 @@ std::ostream& operator<<(std::ostream& os, action::type type) { return os << to_
bool operator==(const action& lhs, const action& rhs) noexcept
{
return lhs.type_ == rhs.type_ && lhs.push_size_ == rhs.push_size_ && lhs.ec_ == rhs.ec_;
if (lhs.get_type() != rhs.get_type())
return false;
switch (lhs.get_type()) {
case action::type::done: return lhs.error() == rhs.error();
case action::type::read_some: return lhs.timeout() == rhs.timeout();
case action::type::notify_push_receiver: return lhs.push_size() == rhs.push_size();
default: BOOST_ASSERT(false); return false;
}
}
std::ostream& operator<<(std::ostream& os, const action& act)
{
os << "action{ .type=" << act.type_;
if (act.type_ == action::type::done)
os << ", .error=" << act.ec_;
else if (act.type_ == action::type::notify_push_receiver)
os << ", .push_size=" << act.push_size_;
auto t = act.get_type();
os << "action{ .type=" << t;
switch (t) {
case action::type::done: os << ", .error=" << act.error(); break;
case action::type::read_some:
os << ", .timeout=" << to_milliseconds(act.timeout()) << "ms";
break;
case action::type::notify_push_receiver: os << ", .push_size=" << act.push_size(); break;
default: BOOST_ASSERT(false);
}
return os << " }";
}
@@ -84,7 +99,11 @@ struct fixture : redis::detail::log_fixture {
connection_state st{make_logger()};
generic_response resp;
fixture() { st.mpx.set_receive_adapter(any_adapter{resp}); }
fixture()
{
st.mpx.set_receive_adapter(any_adapter{resp});
st.cfg.health_check_interval = 3s;
}
};
void test_push()
@@ -94,7 +113,7 @@ void test_push()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// The fsm is asking for data.
std::string const payload =
@@ -118,7 +137,7 @@ void test_push()
// All pushes were delivered so the fsm should demand more data
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// Check logging
fix.check_log({
@@ -135,7 +154,7 @@ void test_read_needs_more()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// Split the incoming message in three random parts and deliver
// them to the reader individually.
@@ -144,12 +163,12 @@ void test_read_needs_more()
// Passes the first part to the fsm.
copy_to(fix.st.mpx, msg[0]);
act = fsm.resume(fix.st, msg[0].size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// Passes the second part to the fsm.
copy_to(fix.st.mpx, msg[1]);
act = fsm.resume(fix.st, msg[1].size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// Passes the third and last part to the fsm, next it should ask us
// to deliver the message.
@@ -159,7 +178,7 @@ void test_read_needs_more()
// All pushes were delivered so the fsm should demand more data
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// Check logging
fix.check_log({
@@ -175,6 +194,44 @@ void test_read_needs_more()
});
}
void test_health_checks_disabled()
{
fixture fix;
reader_fsm fsm;
fix.st.cfg.health_check_interval = 0s;
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::read_some(0s));
// Split the message into two so we cover both the regular read and the needs more branch
constexpr std::string_view msg[] = {">3\r\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"};
// Passes the first part to the fsm.
copy_to(fix.st.mpx, msg[0]);
act = fsm.resume(fix.st, msg[0].size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::read_some(0s));
// Push delivery complete
copy_to(fix.st.mpx, msg[1]);
act = fsm.resume(fix.st, msg[1].size(), error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::notify_push_receiver(25u));
// All pushes were delivered so the fsm should demand more data
act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::read_some(0s));
// Check logging
fix.check_log({
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 14 bytes read" },
{logger::level::debug, "Reader task: incomplete message received"},
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 11 bytes read" },
{logger::level::debug, "Reader task: issuing read" },
});
}
void test_read_error()
{
fixture fix;
@@ -182,7 +239,7 @@ void test_read_error()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
@@ -201,6 +258,29 @@ void test_read_error()
});
}
// A timeout in a read means that the connection is unhealthy (i.e. a PING timed out)
void test_read_timeout()
{
fixture fix;
reader_fsm fsm;
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::read_some(6s));
// Timeout
act = fsm.resume(fix.st, 0, {net::error::operation_aborted}, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code{redis::error::pong_timeout});
// Check logging
fix.check_log({
// clang-format off
{logger::level::debug, "Reader task: issuing read" },
{logger::level::debug, "Reader task: 0 bytes read, error: Pong timeout. [boost.redis:19]"},
// clang-format on
});
}
void test_parse_error()
{
fixture fix;
@@ -208,7 +288,7 @@ void test_parse_error()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// The fsm is asking for data.
std::string const payload = ">a\r\n";
@@ -235,7 +315,7 @@ void test_push_deliver_error()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
@@ -269,7 +349,7 @@ void test_max_read_buffer_size()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// Passes the first part to the fsm.
std::string const part1 = ">3\r\n";
@@ -296,7 +376,7 @@ void test_cancel_read()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// The read was cancelled (maybe after delivering some bytes)
constexpr std::string_view payload = ">1\r\n";
@@ -322,7 +402,7 @@ void test_cancel_read_edge()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// Deliver a push, and notify a cancellation.
// This can happen if the cancellation signal arrives before the read handler runs
@@ -345,7 +425,7 @@ void test_cancel_push_delivery()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// The fsm is asking for data.
constexpr std::string_view payload =
@@ -377,7 +457,7 @@ void test_cancel_push_delivery_edge()
// Initiate
auto act = fsm.resume(fix.st, 0, error_code(), cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act, action::read_some(6s));
// The fsm is asking for data.
constexpr std::string_view payload =
@@ -409,8 +489,10 @@ int main()
{
test_push();
test_read_needs_more();
test_health_checks_disabled();
test_read_error();
test_read_timeout();
test_parse_error();
test_push_deliver_error();
test_max_read_buffer_size();

View File

@@ -7,6 +7,7 @@
//
#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/writer_fsm.hpp>
#include <boost/redis/logger.hpp>
@@ -20,8 +21,10 @@
#include "sansio_utils.hpp"
#include <chrono>
#include <memory>
#include <ostream>
#include <string_view>
using namespace boost::redis;
namespace asio = boost::asio;
@@ -30,18 +33,20 @@ using detail::multiplexer;
using detail::writer_action_type;
using detail::consume_result;
using detail::writer_action;
using detail::connection_state;
using boost::system::error_code;
using boost::asio::cancellation_type_t;
using detail::connection_logger;
using namespace std::chrono_literals;
// Operators
static const char* to_string(writer_action_type value)
{
switch (value) {
case writer_action_type::done: return "writer_action_type::done";
case writer_action_type::write: return "writer_action_type::write";
case writer_action_type::wait: return "writer_action_type::wait";
default: return "<unknown writer_action_type>";
case writer_action_type::done: return "writer_action_type::done";
case writer_action_type::write_some: return "writer_action_type::write";
case writer_action_type::wait: return "writer_action_type::wait";
default: return "<unknown writer_action_type>";
}
}
@@ -55,14 +60,30 @@ std::ostream& operator<<(std::ostream& os, writer_action_type type)
bool operator==(const writer_action& lhs, const writer_action& rhs) noexcept
{
return lhs.type == rhs.type && lhs.ec == rhs.ec;
if (lhs.type() != rhs.type())
return false;
switch (lhs.type()) {
case writer_action_type::done: return lhs.error() == rhs.error();
case writer_action_type::write_some:
case writer_action_type::wait: return lhs.timeout() == rhs.timeout();
default: BOOST_ASSERT(false);
}
return false;
}
std::ostream& operator<<(std::ostream& os, const writer_action& act)
{
os << "writer_action{ .type=" << act.type;
if (act.type == writer_action_type::done)
os << ", .error=" << act.ec;
auto t = act.type();
os << "writer_action{ .type=" << t;
switch (t) {
case writer_action_type::done: os << ", .error=" << act.error(); break;
case writer_action_type::write_some:
case writer_action_type::wait:
os << ", .timeout=" << to_milliseconds(act.timeout()) << "ms";
break;
default: BOOST_ASSERT(false);
}
return os << " }";
}
@@ -89,9 +110,14 @@ struct test_elem {
};
struct fixture : detail::log_fixture {
multiplexer mpx;
connection_logger lgr{make_logger()};
writer_fsm fsm{mpx, lgr};
connection_state st{make_logger()};
writer_fsm fsm;
fixture()
{
st.ping_req.push("PING", "ping_msg"); // would be set up by the runner
st.cfg.health_check_interval = 4s;
}
};
// A single request is written, then we wait and repeat
@@ -102,35 +128,37 @@ void test_single_request()
test_elem item1, item2;
// A request arrives before the writer starts
fix.mpx.add(item1.elm);
fix.st.mpx.add(item1.elm);
// Start. A write is triggered, and the request is marked as staged
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item1.elm->is_staged());
// The write completes successfully. The request is written, and we go back to sleep.
act = fix.fsm.resume(error_code(), item1.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::wait);
act = fix.fsm
.resume(fix.st, error_code(), item1.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
BOOST_TEST(item1.elm->is_written());
// Another request arrives
fix.mpx.add(item2.elm);
fix.st.mpx.add(item2.elm);
// The wait is cancelled to signal we've got a new request
act = fix.fsm.resume(asio::error::operation_aborted, 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item2.elm->is_staged());
// Write successful
act = fix.fsm.resume(error_code(), item2.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::wait);
act = fix.fsm
.resume(fix.st, error_code(), item2.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
BOOST_TEST(item2.elm->is_written());
// Logs
fix.check_log({
{logger::level::info, "Writer task: 24 bytes written."},
{logger::level::info, "Writer task: 24 bytes written."},
{logger::level::debug, "Writer task: 24 bytes written."},
{logger::level::debug, "Writer task: 24 bytes written."},
});
}
@@ -142,32 +170,34 @@ void test_request_arrives_while_writing()
test_elem item1, item2;
// A request arrives before the writer starts
fix.mpx.add(item1.elm);
fix.st.mpx.add(item1.elm);
// Start. A write is triggered, and the request is marked as staged
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item1.elm->is_staged());
// While the write is outstanding, a new request arrives
fix.mpx.add(item2.elm);
fix.st.mpx.add(item2.elm);
// The write completes successfully. The request is written,
// and we start writing the new one
act = fix.fsm.resume(error_code(), item1.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
act = fix.fsm
.resume(fix.st, error_code(), item1.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item1.elm->is_written());
BOOST_TEST(item2.elm->is_staged());
// Write successful
act = fix.fsm.resume(error_code(), item2.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::wait);
act = fix.fsm
.resume(fix.st, error_code(), item2.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
BOOST_TEST(item2.elm->is_written());
// Logs
fix.check_log({
{logger::level::info, "Writer task: 24 bytes written."},
{logger::level::info, "Writer task: 24 bytes written."},
{logger::level::debug, "Writer task: 24 bytes written."},
{logger::level::debug, "Writer task: 24 bytes written."},
});
}
@@ -179,25 +209,166 @@ void test_no_request_at_startup()
test_elem item;
// Start. There is no request, so we wait
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::wait);
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
// A request arrives
fix.mpx.add(item.elm);
fix.st.mpx.add(item.elm);
// The wait is cancelled to signal we've got a new request
act = fix.fsm.resume(asio::error::operation_aborted, 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item.elm->is_staged());
// Write successful
act = fix.fsm.resume(error_code(), item.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::wait);
act = fix.fsm.resume(fix.st, error_code(), item.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
BOOST_TEST(item.elm->is_written());
// Logs
fix.check_log({
{logger::level::info, "Writer task: 24 bytes written."},
{logger::level::debug, "Writer task: 24 bytes written."},
});
}
// We correctly handle short writes
void test_short_writes()
{
// Setup
fixture fix;
test_elem item1;
// A request arrives before the writer starts
fix.st.mpx.add(item1.elm);
// Start. A write is triggered, and the request is marked as staged
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item1.elm->is_staged());
// We write a few bytes. It's not the entire message, so we write again
act = fix.fsm.resume(fix.st, error_code(), 2u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item1.elm->is_staged());
// We write some more bytes, but still not the entire message.
act = fix.fsm.resume(fix.st, error_code(), 5u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item1.elm->is_staged());
// A zero size write doesn't cause trouble
act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item1.elm->is_staged());
// Complete writing the message (the entire payload is 24 bytes long)
act = fix.fsm.resume(fix.st, error_code(), 17u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
BOOST_TEST(item1.elm->is_written());
// Logs
fix.check_log({
{logger::level::debug, "Writer task: 2 bytes written." },
{logger::level::debug, "Writer task: 5 bytes written." },
{logger::level::debug, "Writer task: 0 bytes written." },
{logger::level::debug, "Writer task: 17 bytes written."},
});
}
// If no data arrives during the health check interval, a ping is written
void test_ping()
{
// Setup
fixture fix;
error_code ec;
constexpr std::string_view ping_payload = "*2\r\n$4\r\nPING\r\n$8\r\nping_msg\r\n";
// Start. There is no request, so we wait
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
// No request arrives during the wait interval so a ping is added
act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST_EQ(fix.st.mpx.get_write_buffer(), ping_payload);
// Write successful
act = fix.fsm.resume(fix.st, error_code(), ping_payload.size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
// Simulate a successful response to the PING
constexpr std::string_view ping_response = "$8\r\nping_msg\r\n";
read(fix.st.mpx, ping_response);
auto res = fix.st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST(res.first == consume_result::got_response);
BOOST_TEST_EQ(res.second, ping_response.size());
// Logs
fix.check_log({
{logger::level::debug, "Writer task: 28 bytes written."},
});
}
// Disabled health checks don't cause trouble
void test_health_checks_disabled()
{
// Setup
fixture fix;
test_elem item;
fix.st.cfg.health_check_interval = 0s;
// A request arrives before the writer starts
fix.st.mpx.add(item.elm);
// Start. A write is triggered, and the request is marked as staged
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(0s));
BOOST_TEST(item.elm->is_staged());
// The write completes successfully. The request is written, and we go back to sleep.
act = fix.fsm.resume(fix.st, error_code(), item.req.payload().size(), cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(0s));
BOOST_TEST(item.elm->is_written());
// Logs
fix.check_log({
{logger::level::debug, "Writer task: 24 bytes written."},
});
}
// If the server answers with an error in PING, we log it and produce an error
void test_ping_error()
{
// Setup
fixture fix;
error_code ec;
// Start. There is no request, so we wait
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
// No request arrives during the wait interval so a ping is added
act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
// Write successful
const auto ping_size = fix.st.mpx.get_write_buffer().size();
act = fix.fsm.resume(fix.st, error_code(), ping_size, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
// Simulate an error response to the PING
constexpr std::string_view ping_response = "-ERR: bad command\r\n";
read(fix.st.mpx, ping_response);
auto res = fix.st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error::resp3_simple_error);
BOOST_TEST(res.first == consume_result::got_response);
BOOST_TEST_EQ(res.second, ping_response.size());
// Logs
fix.check_log({
{logger::level::debug, "Writer task: 28 bytes written." },
{logger::level::info, "Health checker: server answered ping with an error: ERR: bad command"},
});
}
@@ -209,23 +380,51 @@ void test_write_error()
test_elem item;
// A request arrives before the writer starts
fix.mpx.add(item.elm);
fix.st.mpx.add(item.elm);
// Start. A write is triggered, and the request is marked as staged
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item.elm->is_staged());
// The write completes with an error (possibly with partial success).
// The request is still staged, and the writer exits.
// Use an error we control so we can check logs
act = fix.fsm.resume(error::empty_field, 2u, cancellation_type_t::none);
act = fix.fsm.resume(fix.st, error::empty_field, 2u, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::empty_field));
BOOST_TEST(item.elm->is_staged());
// Logs
fix.check_log({
{logger::level::info, "Writer task error: Expected field value is empty. [boost.redis:5]"},
{logger::level::debug, "Writer task: 2 bytes written." },
{logger::level::debug, "Writer task error: Expected field value is empty. [boost.redis:5]"},
});
}
void test_write_timeout()
{
// Setup
fixture fix;
test_elem item;
// A request arrives before the writer starts
fix.st.mpx.add(item.elm);
// Start. A write is triggered, and the request is marked as staged
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item.elm->is_staged());
// The write times out, so it completes with operation_aborted
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, error_code(error::write_timeout));
BOOST_TEST(item.elm->is_staged());
// Logs
fix.check_log({
{logger::level::debug, "Writer task: 0 bytes written." },
{logger::level::debug,
"Writer task error: Timeout while writing data to the server. [boost.redis:27]"},
});
}
@@ -237,21 +436,22 @@ void test_cancel_write()
test_elem item;
// A request arrives before the writer starts
fix.mpx.add(item.elm);
fix.st.mpx.add(item.elm);
// Start. A write is triggered
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item.elm->is_staged());
// Write cancelled and failed with operation_aborted
act = fix.fsm.resume(asio::error::operation_aborted, 2u, cancellation_type_t::terminal);
act = fix.fsm.resume(fix.st, asio::error::operation_aborted, 2u, cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
BOOST_TEST(item.elm->is_staged());
// Logs
fix.check_log({
{logger::level::debug, "Writer task: cancelled (1)."},
{logger::level::debug, "Writer task: 2 bytes written."},
{logger::level::debug, "Writer task: cancelled (1)." },
});
}
@@ -263,21 +463,23 @@ void test_cancel_write_edge()
test_elem item;
// A request arrives before the writer starts
fix.mpx.add(item.elm);
fix.st.mpx.add(item.elm);
// Start. A write is triggered
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::write);
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::write_some(4s));
BOOST_TEST(item.elm->is_staged());
// Write cancelled but without error
act = fix.fsm.resume(error_code(), item.req.payload().size(), cancellation_type_t::terminal);
act = fix.fsm
.resume(fix.st, error_code(), item.req.payload().size(), cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
BOOST_TEST(item.elm->is_written());
// Logs
fix.check_log({
{logger::level::debug, "Writer task: cancelled (1)."},
{logger::level::debug, "Writer task: 24 bytes written."},
{logger::level::debug, "Writer task: cancelled (1)." },
});
}
@@ -289,14 +491,18 @@ void test_cancel_wait()
test_elem item;
// Start. There is no request, so we wait
auto act = fix.fsm.resume(error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action_type::wait);
auto act = fix.fsm.resume(fix.st, error_code(), 0u, cancellation_type_t::none);
BOOST_TEST_EQ(act, writer_action::wait(4s));
// Sanity check: the writer doesn't touch the multiplexer after a cancellation
fix.mpx.add(item.elm);
fix.st.mpx.add(item.elm);
// Cancel the wait, setting the cancellation state
act = fix.fsm.resume(asio::error::operation_aborted, 0u, asio::cancellation_type_t::terminal);
act = fix.fsm.resume(
fix.st,
asio::error::operation_aborted,
0u,
asio::cancellation_type_t::terminal);
BOOST_TEST_EQ(act, error_code(asio::error::operation_aborted));
BOOST_TEST(item.elm->is_waiting());
@@ -313,8 +519,14 @@ int main()
test_single_request();
test_request_arrives_while_writing();
test_no_request_at_startup();
test_short_writes();
test_health_checks_disabled();
test_ping();
test_ping_error();
test_write_error();
test_write_timeout();
test_cancel_write();
test_cancel_write_edge();