2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Simplifies the read_buffer and adds rotated bytes to usage.

Data rotation in the read buffer creates latency, we know it
is preset but so far its magnitude was unknown. This PR adds
it as a new field to the usage struct. For example, the
test_conn_echo_stress outputs now

   Commands sent: 780,002
   Bytes sent: 32,670,085
   Responses received: 780,001
   Pushes received: 750,001
   Bytes received (response): 3,210,147
   Bytes received (push): 32,250,036
   Bytes rotated: 3,109,190,184

In total approximately 34Mb are received but 3Gb are
rotated.
This commit is contained in:
Marcelo Zimbres
2025-09-14 21:43:19 +02:00
parent e414b3941a
commit a70bdf6574
20 changed files with 323 additions and 164 deletions

View File

@@ -15,7 +15,7 @@ target_compile_features(boost_redis_src PRIVATE cxx_std_17)
target_link_libraries(boost_redis_src PRIVATE boost_redis_project_options)
# Test utils
add_library(boost_redis_tests_common STATIC common.cpp)
add_library(boost_redis_tests_common STATIC common.cpp sansio_utils.cpp)
target_compile_features(boost_redis_tests_common PRIVATE cxx_std_17)
target_link_libraries(boost_redis_tests_common PRIVATE boost_redis_project_options)

View File

@@ -42,6 +42,7 @@ lib redis_test_common
:
boost_redis.cpp
common.cpp
sansio_utils.cpp
: requirements $(requirements)
: usage-requirements $(requirements)
;

View File

@@ -68,10 +68,3 @@ void run_coroutine_test(net::awaitable<void> op, std::chrono::steady_clock::dura
throw std::runtime_error("Coroutine test did not finish");
}
#endif // BOOST_ASIO_HAS_CO_AWAIT
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data)
{
auto const buffer = rbuf.get_append_buffer();
BOOST_ASSERT(data.size() <= buffer.size());
std::copy(data.begin(), data.end(), buffer.begin());
}

View File

@@ -35,5 +35,3 @@ void run(
boost::redis::config cfg = make_test_config(),
boost::system::error_code ec = boost::asio::error::operation_aborted,
boost::redis::operation op = boost::redis::operation::receive);
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data);

25
test/sansio_utils.cpp Normal file
View File

@@ -0,0 +1,25 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include "sansio_utils.hpp"
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/core/ignore_unused.hpp>
namespace boost::redis::detail {
void read(multiplexer& mpx, std::string_view data)
{
auto const ec = mpx.prepare_read();
ignore_unused(ec);
BOOST_ASSERT(ec == system::error_code{});
auto const buffer = mpx.get_prepared_read_buffer();
BOOST_ASSERT(buffer.size() >= data.size());
std::copy(data.cbegin(), data.cend(), buffer.begin());
mpx.commit_read(data.size());
}
} // namespace boost::redis::detail

28
test/sansio_utils.hpp Normal file
View File

@@ -0,0 +1,28 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_TEST_SANSIO_UTILS_HPP
#define BOOST_REDIS_TEST_SANSIO_UTILS_HPP
#include <string_view>
namespace boost::redis::detail {
class multiplexer;
// Read data into the multiplexer with the following steps
//
// 1. prepare_read
// 2. get_read_buffer
// 3. Copy data in the buffer from 2.
// 4. commit_read;
//
// This is used in the multiplexer tests.
void read(multiplexer& mpx, std::string_view data);
} // namespace boost::redis::detail
#endif // BOOST_REDIS_TEST_SANSIO_UTILS_HPP

View File

@@ -43,8 +43,9 @@ std::ostream& operator<<(std::ostream& os, usage const& u)
<< "Bytes sent: " << u.bytes_sent << "\n"
<< "Responses received: " << u.responses_received << "\n"
<< "Pushes received: " << u.pushes_received << "\n"
<< "Response bytes received: " << u.response_bytes_received << "\n"
<< "Push bytes received: " << u.push_bytes_received;
<< "Bytes received (response): " << u.response_bytes_received << "\n"
<< "Bytes received (push): " << u.push_bytes_received << "\n"
<< "Bytes rotated: " << u.bytes_rotated;
return os;
}

View File

@@ -14,12 +14,15 @@
#include <boost/asio/error.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include <boost/assert.hpp>
#include <cstddef>
#include <memory>
#include <ostream>
#include <utility>
#include "sansio_utils.hpp"
using namespace boost::redis;
namespace asio = boost::asio;
using detail::exec_fsm;
@@ -125,7 +128,8 @@ void test_success()
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
read(mpx, "$5\r\nhello\r\n");
auto req_status = mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
@@ -166,7 +170,8 @@ void test_parse_error()
// The second field should be a number (rather than the empty string).
// Note that although part of the buffer was consumed, the multiplexer
// currently throws this information away.
auto req_status = mpx.consume_next("*2\r\n$5\r\nhello\r\n:\r\n", ec);
read(mpx, "*2\r\n$5\r\nhello\r\n:\r\n");
auto req_status = mpx.consume(ec);
BOOST_TEST_EQ(ec, error::empty_field);
BOOST_TEST_EQ(req_status.second, 15u);
BOOST_TEST_EQ(input.done_calls, 1u);
@@ -224,7 +229,8 @@ void test_not_connected()
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
read(mpx, "$5\r\nhello\r\n");
auto req_status = mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
@@ -317,9 +323,10 @@ void test_cancel_notwaiting_terminal_partial()
BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted));
input.reset(); // Verify we don't access the request or response after completion
// When the response to this request arrives, it gets ignored
error_code ec;
auto res = mpx.consume_next("-ERR wrong command\r\n", ec);
// When the response to this request arrives, it gets ignored
read(mpx, "-ERR wrong command\r\n");
auto res = mpx.consume(ec);
BOOST_TEST_EQ_MSG(ec, error_code(), tc.name);
BOOST_TEST_EQ_MSG(res.first, consume_result::got_response, tc.name);
@@ -355,7 +362,8 @@ void test_cancel_notwaiting_total()
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
read(mpx, "$5\r\nhello\r\n");
auto req_status = mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed

View File

@@ -197,22 +197,22 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_error)
// Usual case, max size is bigger then requested size.
buf.set_config({10, 10});
auto ec = buf.prepare_append();
auto ec = buf.prepare();
BOOST_TEST(!ec);
buf.commit_append(10);
buf.commit(10);
// Corner case, max size is equal to the requested size.
buf.set_config({10, 20});
ec = buf.prepare_append();
ec = buf.prepare();
BOOST_TEST(!ec);
buf.commit_append(10);
buf.consume_committed(20);
buf.commit(10);
buf.consume(20);
auto const tmp = buf;
// Error case, max size is smaller to the requested size.
buf.set_config({10, 9});
ec = buf.prepare_append();
ec = buf.prepare();
BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size});
// Check that an error call has no side effects.
@@ -227,19 +227,34 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data)
read_buffer buf;
buf.set_config({10, 10});
auto ec = buf.prepare_append();
auto ec = buf.prepare();
BOOST_TEST(!ec);
auto res = buf.consume(5);
// No data has been committed yet so nothing can be consummed.
BOOST_CHECK_EQUAL(buf.consume_committed(5), 0u);
BOOST_CHECK_EQUAL(res.consumed, 0u);
buf.commit_append(10);
// If nothing was consumed, nothing got rotated.
BOOST_CHECK_EQUAL(res.rotated, 0u);
// All five bytes can be consumed.
BOOST_CHECK_EQUAL(buf.consume_committed(5), 5u);
buf.commit(10);
res = buf.consume(5);
// All five bytes should have been consumed.
BOOST_CHECK_EQUAL(res.consumed, 5u);
// We added a total of 10 bytes and consumed 5, that means, 5 were
// rotated.
BOOST_CHECK_EQUAL(res.rotated, 5u);
res = buf.consume(7);
// Only the remaining five bytes can be consumed
BOOST_CHECK_EQUAL(buf.consume_committed(7), 5u);
BOOST_CHECK_EQUAL(res.consumed, 5u);
// No bytes to rotated.
BOOST_CHECK_EQUAL(res.rotated, 0u);
}
BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
@@ -249,10 +264,10 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
read_buffer buf;
buf.set_config({10, 10});
auto ec = buf.prepare_append();
auto ec = buf.prepare();
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u);
BOOST_CHECK_EQUAL(buf.get_prepared().size(), 10u);
}
BOOST_AUTO_TEST_CASE(check_counter_adapter)

View File

@@ -21,6 +21,8 @@
#include <string>
#include <string_view>
#include "sansio_utils.hpp"
using boost::redis::request;
using boost::redis::detail::multiplexer;
using boost::redis::detail::consume_result;
@@ -122,11 +124,13 @@ void test_request_needs_more()
// Parse part of the response
error_code ec;
auto ret = mpx.consume_next("$11\r\nhello", ec);
read(mpx, "$11\r\nhello");
auto ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
// Parse the rest of it
ret = mpx.consume_next("$11\r\nhello world\r\n", ec);
read(mpx, " world\r\n");
ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
const node expected[] = {
{type::blob_string, 1u, 0u, "hello world"},
@@ -191,7 +195,8 @@ void test_several_requests()
// Consumes the next message in the read buffer.
error_code ec;
auto ret = mpx.consume_next("+one\r\n", ec);
read(mpx, "+one\r\n");
auto ret = mpx.consume(ec);
// The read operation should have been successful.
BOOST_TEST_EQ(ret.first, consume_result::got_response);
@@ -204,7 +209,8 @@ void test_several_requests()
// Consumes the second message in the read buffer
// Consumes the next message in the read buffer.
ret = mpx.consume_next("+two\r\n", ec);
read(mpx, "+two\r\n");
ret = mpx.consume(ec);
// The read operation should have been successful.
BOOST_TEST_EQ(ret.first, consume_result::got_response);
@@ -234,7 +240,8 @@ void test_request_response_before_write()
// The response is received. The request is marked as done,
// even if the write hasn't been confirmed yet
error_code ec;
auto ret = mpx.consume_next("+one\r\n", ec);
read(mpx, "+one\r\n");
auto ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
@@ -256,7 +263,8 @@ void test_push()
// Consume an entire push
error_code ec;
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
read(mpx, ">2\r\n+one\r\n+two\r\n");
auto const ret = mpx.consume(ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
@@ -275,20 +283,17 @@ void test_push_needs_more()
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
std::string msg;
// Only part of the message available.
msg += ">2\r\n+one\r";
// Consume it
// Consume it only part of the message available.
error_code ec;
auto ret = mpx.consume_next(msg, ec);
read(mpx, ">2\r\n+one\r");
auto ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
// The entire message becomes available
msg += "\n+two\r\n";
ret = mpx.consume_next(msg, ec);
read(mpx, "\n+two\r\n");
ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
@@ -311,7 +316,8 @@ void test_push_heuristics_no_request()
// Response received, but no request has been sent
error_code ec;
auto const ret = mpx.consume_next("+Hello world\r\n", ec);
read(mpx, "+Hello world\r\n");
auto const ret = mpx.consume(ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
@@ -337,7 +343,8 @@ void test_push_heuristics_request_waiting()
// Response received, but no request has been sent
error_code ec;
auto const ret = mpx.consume_next("+Hello world\r\n", ec);
read(mpx, "+Hello world\r\n");
auto const ret = mpx.consume(ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
@@ -366,7 +373,8 @@ void test_push_heuristics_request_without_response()
// Response received (e.g. syntax error)
error_code ec;
auto const ret = mpx.consume_next("-ERR wrong syntax\r\n", ec);
read(mpx, "-ERR wrong syntax\r\n");
auto const ret = mpx.consume(ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
@@ -396,7 +404,8 @@ void test_mix_responses_pushes()
// Push
std::string_view push1_buffer = ">2\r\n+one\r\n+two\r\n";
error_code ec;
auto ret = mpx.consume_next(push1_buffer, ec);
read(mpx, push1_buffer);
auto ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
std::vector<node> expected{
@@ -410,7 +419,8 @@ void test_mix_responses_pushes()
// First response
std::string_view response1_buffer = "$11\r\nHello world\r\n";
ret = mpx.consume_next(response1_buffer, ec);
read(mpx, response1_buffer);
ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, 18u);
expected = {
@@ -422,7 +432,8 @@ void test_mix_responses_pushes()
// Push
std::string_view push2_buffer = ">2\r\n+other\r\n+push\r\n";
ret = mpx.consume_next(push2_buffer, ec);
read(mpx, push2_buffer);
ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 19u);
expected = {
@@ -439,7 +450,8 @@ void test_mix_responses_pushes()
// Second response
std::string_view response2_buffer = "$8\r\nResponse\r\n";
ret = mpx.consume_next(response2_buffer, ec);
read(mpx, response2_buffer);
ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, 14u);
expected = {
@@ -478,7 +490,8 @@ void test_cancel_waiting()
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
error_code ec;
auto res = mpx.consume_next("$11\r\nHello world\r\n", ec);
read(mpx, "$11\r\nHello world\r\n");
auto res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
@@ -509,12 +522,14 @@ void test_cancel_staged()
// The cancelled request's response arrives. It gets discarded
error_code ec;
auto res = mpx.consume_next("+Goodbye\r\n", ec);
read(mpx, "+Goodbye\r\n");
auto res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
// The 2nd request's response arrives. It gets parsed successfully
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
read(mpx, "$11\r\nHello world\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
@@ -545,7 +560,8 @@ void test_cancel_staged_command_without_response()
// The 2nd request's response arrives. It gets parsed successfully
error_code ec;
auto res = mpx.consume_next("$11\r\nHello world\r\n", ec);
read(mpx, "$11\r\nHello world\r\n");
auto res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
@@ -574,12 +590,14 @@ void test_cancel_written()
// The cancelled request's response arrives. It gets discarded
error_code ec;
auto res = mpx.consume_next("+Goodbye\r\n", ec);
read(mpx, "+Goodbye\r\n");
auto res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
// The 2nd request's response arrives. It gets parsed successfully
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
read(mpx, "$11\r\nHello world\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
@@ -609,13 +627,15 @@ void test_cancel_written_half_parsed_response()
// Get the response for the 1st command in req1
error_code ec;
auto res = mpx.consume_next("+Goodbye\r\n", ec);
read(mpx, "+Goodbye\r\n");
auto res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item1->done);
BOOST_TEST_EQ(ec, error_code());
// Get a partial response for the 2nd command in req1
res = mpx.consume_next("*2\r\n$4\r\nsome\r\n", ec);
read(mpx, "*2\r\n$4\r\nsome\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::needs_more);
BOOST_TEST_NOT(item1->done);
BOOST_TEST_EQ(ec, error_code());
@@ -625,19 +645,22 @@ void test_cancel_written_half_parsed_response()
item1.reset(); // Verify we don't reference this item anyhow
// Get the rest of the response for the 2nd command in req1
res = mpx.consume_next("*2\r\n$4\r\nsome\r\n$4\r\ndata\r\n", ec);
read(mpx, "*2\r\n$4\r\nsome\r\n$4\r\ndata\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
BOOST_TEST_EQ(ec, error_code());
// Get the response for the 3rd command in req1
res = mpx.consume_next("+last\r\n", ec);
read(mpx, "+last\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_NOT(item2->done);
BOOST_TEST_EQ(ec, error_code());
// Get the response for the 2nd request
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
read(mpx, "$11\r\nHello world\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
@@ -672,23 +695,27 @@ void test_cancel_written_null_error()
// The cancelled request's response arrives. It contains NULLs and errors.
// We ignore them
error_code ec;
auto res = mpx.consume_next("-ERR wrong command\r\n", ec);
read(mpx, "-ERR wrong command\r\n");
auto res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_NOT(item2->done);
res = mpx.consume_next("!3\r\nBad\r\n", ec);
read(mpx, "!3\r\nBad\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_NOT(item2->done);
res = mpx.consume_next("_\r\n", ec);
read(mpx, "_\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_NOT(item2->done);
// The 2nd request's response arrives. It gets parsed successfully
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
read(mpx, "$11\r\nHello world\r\n");
res = mpx.consume(ec);
BOOST_TEST_EQ(res.first, consume_result::got_response);
BOOST_TEST(item2->done);
const node expected[] = {
@@ -812,7 +839,8 @@ void test_cancel_on_connection_lost_abandoned()
// mpx.add(item.elem_ptr);
// BOOST_TEST_EQ(mpx.prepare_write(), 1u);
// BOOST_TEST_EQ(mpx.commit_write(), 0u);
// auto res = mpx.consume_next("*2\r\n+hello\r\n", ec);
// read(mpx, "*2\r\n+hello\r\n");
// auto res = mpx.consume(ec);
// BOOST_TEST_EQ(res.first, consume_result::needs_more);
// BOOST_TEST_EQ(ec, error_code());
@@ -827,7 +855,8 @@ void test_cancel_on_connection_lost_abandoned()
// // Successful write, and this time the response is complete
// BOOST_TEST_EQ(mpx.prepare_write(), 1u);
// BOOST_TEST_EQ(mpx.commit_write(), 0u);
// res = mpx.consume_next("*2\r\n+hello\r\n+world\r\n", ec);
// read(mpx, "*2\r\n+hello\r\n+world\r\n");
// res = mpx.consume(ec);
// BOOST_TEST_EQ(res.first, consume_result::got_response);
// BOOST_TEST_EQ(ec, error_code());
@@ -854,7 +883,8 @@ void test_reset()
// Start parsing a push
error_code ec;
auto ret = mpx.consume_next(">2\r", ec);
read(mpx, ">2\r");
auto ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
// Connection lost. The first request gets cancelled
@@ -871,7 +901,8 @@ void test_reset()
BOOST_TEST_EQ(mpx.commit_write(), 0u);
std::string_view response_buffer = "$11\r\nHello world\r\n";
ret = mpx.consume_next(response_buffer, ec);
read(mpx, response_buffer);
ret = mpx.consume(ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, response_buffer.size());
const node expected[] = {

View File

@@ -5,7 +5,6 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/asio/cancellation_type.hpp>
@@ -21,9 +20,9 @@ using boost::system::error_code;
using net::cancellation_type_t;
using redis::detail::reader_fsm;
using redis::detail::multiplexer;
using redis::detail::read_buffer;
using redis::generic_response;
using redis::any_adapter;
using redis::config;
using action = redis::detail::reader_fsm::action;
namespace boost::redis::detail {
@@ -35,6 +34,20 @@ std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t)
os << to_string(t);
return os;
}
// Copy data into the multiplexer with the following steps
//
// 1. get_read_buffer
// 2. Copy data in the buffer from 2.
//
// This is used in the reader_fsm tests.
void copy_to(multiplexer& mpx, std::string_view data)
{
auto const buffer = mpx.get_prepared_read_buffer();
BOOST_ASSERT(buffer.size() >= data.size());
std::copy(data.cbegin(), data.cend(), buffer.begin());
}
} // namespace boost::redis::detail
// Operators
@@ -42,11 +55,10 @@ namespace {
void test_push()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
reader_fsm fsm{mpx};
error_code ec;
action act;
@@ -54,7 +66,7 @@ void test_push()
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
std::string const payload =
@@ -62,7 +74,7 @@ void test_push()
">1\r\n+msg2 \r\n"
">1\r\n+msg3 \r\n";
append_read_data(rbuf, payload);
copy_to(mpx, payload);
// Deliver the 1st push
act = fsm.resume(payload.size(), ec, cancellation_type_t::none);
@@ -84,17 +96,16 @@ void test_push()
// All pushes were delivered so the fsm should demand more data
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act.ec_, error_code());
}
void test_read_needs_more()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
reader_fsm fsm{mpx};
error_code ec;
action act;
@@ -102,27 +113,27 @@ void test_read_needs_more()
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// Split the incoming message in three random parts and deliver
// them to the reader individually.
std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"};
// Passes the first part to the fsm.
append_read_data(rbuf, msg[0]);
copy_to(mpx, msg[0]);
act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
// Passes the second part to the fsm.
append_read_data(rbuf, msg[1]);
copy_to(mpx, msg[1]);
act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::needs_more);
BOOST_TEST_EQ(act.ec_, error_code());
// Passes the third and last part to the fsm, next it should ask us
// to deliver the message.
append_read_data(rbuf, msg[2]);
copy_to(mpx, msg[2]);
act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size());
@@ -130,17 +141,16 @@ void test_read_needs_more()
// All pushes were delivered so the fsm should demand more data
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
BOOST_TEST_EQ(act.ec_, error_code());
}
void test_read_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
reader_fsm fsm{mpx};
error_code ec;
action act;
@@ -148,11 +158,11 @@ void test_read_error()
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
append_read_data(rbuf, payload);
copy_to(mpx, payload);
// Deliver the data
act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none);
@@ -167,11 +177,10 @@ void test_read_error()
void test_parse_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
reader_fsm fsm{mpx};
error_code ec;
action act;
@@ -179,11 +188,11 @@ void test_parse_error()
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
std::string const payload = ">a\r\n";
append_read_data(rbuf, payload);
copy_to(mpx, payload);
// Deliver the data
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
@@ -198,11 +207,10 @@ void test_parse_error()
void test_push_deliver_error()
{
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
reader_fsm fsm{mpx};
error_code ec;
action act;
@@ -210,11 +218,11 @@ void test_push_deliver_error()
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// The fsm is asking for data.
std::string const payload = ">1\r\n+msg1\r\n";
append_read_data(rbuf, payload);
copy_to(mpx, payload);
// Deliver the data
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
@@ -233,12 +241,15 @@ void test_push_deliver_error()
void test_max_read_buffer_size()
{
read_buffer rbuf;
rbuf.set_config({5, 7});
config cfg;
cfg.read_buffer_append_size = 5;
cfg.max_read_size = 7;
multiplexer mpx;
mpx.set_config(cfg);
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
reader_fsm fsm{rbuf, mpx};
reader_fsm fsm{mpx};
error_code ec;
action act;
@@ -246,11 +257,11 @@ void test_max_read_buffer_size()
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
act = fsm.resume(0, ec, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::append_some);
BOOST_TEST_EQ(act.type_, action::type::read_some);
// Passes the first part to the fsm.
std::string const part1 = ">3\r\n";
append_read_data(rbuf, part1);
copy_to(mpx, part1);
act = fsm.resume(part1.size(), {}, cancellation_type_t::none);
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
BOOST_TEST_EQ(act.ec_, error_code());