mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Merge pull request #311 from boostorg/refactoring_clean_code
Simplifies the read_buffer and add rotated bytes to usage.
This commit is contained in:
@@ -45,7 +45,6 @@
|
||||
#include <boost/asio/ssl/stream.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/config.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
|
||||
@@ -85,7 +84,6 @@ struct connection_impl {
|
||||
config cfg_;
|
||||
multiplexer mpx_;
|
||||
connection_logger logger_;
|
||||
read_buffer read_buffer_;
|
||||
generic_response setup_resp_;
|
||||
request ping_req_;
|
||||
generic_response ping_resp_;
|
||||
@@ -139,10 +137,6 @@ struct connection_impl {
|
||||
{
|
||||
set_receive_adapter(any_adapter{ignore});
|
||||
writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
|
||||
|
||||
// Reserve some memory to avoid excessive memory allocations in
|
||||
// the first reads.
|
||||
read_buffer_.reserve(4096u);
|
||||
}
|
||||
|
||||
void cancel(operation op)
|
||||
@@ -263,7 +257,7 @@ struct reader_op {
|
||||
public:
|
||||
reader_op(connection_impl<Executor>& conn) noexcept
|
||||
: conn_{&conn}
|
||||
, fsm_{conn.read_buffer_, conn.mpx_}
|
||||
, fsm_{conn.mpx_}
|
||||
{ }
|
||||
|
||||
template <class Self>
|
||||
@@ -279,9 +273,9 @@ public:
|
||||
self.reset_cancellation_state(asio::enable_terminal_cancellation());
|
||||
continue;
|
||||
case reader_fsm::action::type::needs_more:
|
||||
case reader_fsm::action::type::append_some:
|
||||
case reader_fsm::action::type::read_some:
|
||||
{
|
||||
auto const buf = conn_->read_buffer_.get_append_buffer();
|
||||
auto const buf = conn_->mpx_.get_prepared_read_buffer();
|
||||
conn_->stream_.async_read_some(asio::buffer(buf), std::move(self));
|
||||
}
|
||||
return;
|
||||
@@ -547,7 +541,6 @@ public:
|
||||
|
||||
// If we were successful, run all the connection tasks
|
||||
if (!ec) {
|
||||
conn_->read_buffer_.clear();
|
||||
conn_->mpx_.reset();
|
||||
clear_response(conn_->setup_resp_);
|
||||
|
||||
@@ -757,7 +750,7 @@ public:
|
||||
auto async_run(config const& cfg, CompletionToken&& token = {})
|
||||
{
|
||||
impl_->cfg_ = cfg;
|
||||
impl_->read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size});
|
||||
impl_->mpx_.set_config(cfg);
|
||||
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
detail::run_op<Executor>{impl_.get()},
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
#include <boost/redis/adapter/adapt.hpp>
|
||||
#include <boost/redis/adapter/any_adapter.hpp>
|
||||
#include <boost/redis/config.hpp>
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
#include <boost/redis/resp3/node.hpp>
|
||||
#include <boost/redis/resp3/parser.hpp>
|
||||
@@ -126,6 +127,8 @@ public:
|
||||
std::size_t read_size_;
|
||||
};
|
||||
|
||||
multiplexer();
|
||||
|
||||
// To be called before a write operation. Coalesces all available requests
|
||||
// into a single buffer. Returns the number of coalesced requests.
|
||||
// Must be called before cancel_on_conn_lost() because it might change
|
||||
@@ -144,8 +147,7 @@ public:
|
||||
// Must be called before cancel_on_conn_lost() because it might change
|
||||
// request status.
|
||||
[[nodiscard]]
|
||||
auto consume_next(std::string_view data, system::error_code& ec)
|
||||
-> std::pair<consume_result, std::size_t>;
|
||||
auto consume(system::error_code& ec) -> std::pair<consume_result, std::size_t>;
|
||||
|
||||
auto add(std::shared_ptr<elem> const& ptr) -> void;
|
||||
void cancel(std::shared_ptr<elem> const& ptr);
|
||||
@@ -178,6 +180,17 @@ public:
|
||||
return std::string_view{write_buffer_};
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_prepared_read_buffer() noexcept -> read_buffer::span_type;
|
||||
|
||||
[[nodiscard]]
|
||||
auto prepare_read() noexcept -> system::error_code;
|
||||
|
||||
void commit_read(std::size_t read_size);
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_read_buffer_size() const noexcept -> std::size_t;
|
||||
|
||||
void set_receive_adapter(any_adapter adapter);
|
||||
|
||||
[[nodiscard]]
|
||||
@@ -189,8 +202,10 @@ public:
|
||||
[[nodiscard]]
|
||||
auto is_writing() const noexcept -> bool;
|
||||
|
||||
void set_config(config const& cfg);
|
||||
|
||||
private:
|
||||
void commit_usage(bool is_push, std::size_t size);
|
||||
void commit_usage(bool is_push, read_buffer::consume_result res);
|
||||
|
||||
[[nodiscard]]
|
||||
auto is_next_push(std::string_view data) const noexcept -> bool;
|
||||
@@ -200,8 +215,9 @@ private:
|
||||
auto release_push_requests() -> std::size_t;
|
||||
|
||||
[[nodiscard]]
|
||||
consume_result consume_next_impl(std::string_view data, system::error_code& ec);
|
||||
consume_result consume_impl(system::error_code& ec);
|
||||
|
||||
read_buffer read_buffer_;
|
||||
std::string write_buffer_;
|
||||
std::deque<std::shared_ptr<elem>> reqs_;
|
||||
resp3::parser parser_{};
|
||||
|
||||
@@ -21,30 +21,34 @@ class read_buffer {
|
||||
public:
|
||||
using span_type = span<char>;
|
||||
|
||||
struct consume_result {
|
||||
std::size_t consumed;
|
||||
std::size_t rotated;
|
||||
};
|
||||
|
||||
// See config.hpp for the meaning of these parameters.
|
||||
struct config {
|
||||
std::size_t read_buffer_append_size = 4096u;
|
||||
std::size_t max_read_size = static_cast<std::size_t>(-1);
|
||||
};
|
||||
|
||||
// Prepare the buffer to receive more data.
|
||||
[[nodiscard]]
|
||||
auto prepare_append() -> system::error_code;
|
||||
auto prepare() -> system::error_code;
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_append_buffer() noexcept -> span_type;
|
||||
auto get_prepared() noexcept -> span_type;
|
||||
|
||||
void commit_append(std::size_t read_size);
|
||||
void commit(std::size_t read_size);
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_committed_buffer() const noexcept -> std::string_view;
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_committed_size() const noexcept -> std::size_t;
|
||||
auto get_commited() const noexcept -> std::string_view;
|
||||
|
||||
void clear();
|
||||
|
||||
// Consume committed data.
|
||||
auto consume_committed(std::size_t size) -> std::size_t;
|
||||
// Consumes committed data by rotating the remaining data to the
|
||||
// front of the buffer.
|
||||
auto consume(std::size_t size) -> consume_result;
|
||||
|
||||
void reserve(std::size_t n);
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ public:
|
||||
enum class type
|
||||
{
|
||||
setup_cancellation,
|
||||
append_some,
|
||||
read_some,
|
||||
needs_more,
|
||||
notify_push_receiver,
|
||||
cancel_run,
|
||||
@@ -35,7 +35,7 @@ public:
|
||||
system::error_code ec_ = {};
|
||||
};
|
||||
|
||||
explicit reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept;
|
||||
explicit reader_fsm(multiplexer& mpx) noexcept;
|
||||
|
||||
action resume(
|
||||
std::size_t bytes_read,
|
||||
@@ -44,9 +44,8 @@ public:
|
||||
|
||||
private:
|
||||
int resume_point_{0};
|
||||
read_buffer* read_buffer_ = nullptr;
|
||||
action action_after_resume_;
|
||||
action::type next_read_type_ = action::type::append_some;
|
||||
action::type next_read_type_ = action::type::read_some;
|
||||
multiplexer* mpx_ = nullptr;
|
||||
std::pair<consume_result, std::size_t> res_{consume_result::needs_more, 0u};
|
||||
};
|
||||
|
||||
@@ -26,7 +26,7 @@ auto to_string(reader_fsm::action::type t) noexcept -> char const*
|
||||
{
|
||||
switch (t) {
|
||||
BOOST_REDIS_READER_SWITCH_CASE(setup_cancellation);
|
||||
BOOST_REDIS_READER_SWITCH_CASE(append_some);
|
||||
BOOST_REDIS_READER_SWITCH_CASE(read_some);
|
||||
BOOST_REDIS_READER_SWITCH_CASE(needs_more);
|
||||
BOOST_REDIS_READER_SWITCH_CASE(notify_push_receiver);
|
||||
BOOST_REDIS_READER_SWITCH_CASE(cancel_run);
|
||||
|
||||
@@ -46,6 +46,13 @@ void multiplexer::elem::mark_abandoned()
|
||||
set_done_callback([] { });
|
||||
}
|
||||
|
||||
multiplexer::multiplexer()
|
||||
{
|
||||
// Reserve some memory to avoid excessive memory allocations in
|
||||
// the first reads.
|
||||
read_buffer_.reserve(4096u);
|
||||
}
|
||||
|
||||
void multiplexer::cancel(std::shared_ptr<elem> const& ptr)
|
||||
{
|
||||
if (ptr->is_waiting()) {
|
||||
@@ -61,6 +68,7 @@ void multiplexer::cancel(std::shared_ptr<elem> const& ptr)
|
||||
std::size_t multiplexer::commit_write()
|
||||
{
|
||||
BOOST_ASSERT(!cancel_run_called_);
|
||||
usage_.bytes_sent += std::size(write_buffer_);
|
||||
|
||||
// We have to clear the payload right after writing it to use it
|
||||
// as a flag that informs there is no ongoing write.
|
||||
@@ -93,7 +101,7 @@ void multiplexer::add(std::shared_ptr<elem> const& info)
|
||||
}
|
||||
}
|
||||
|
||||
consume_result multiplexer::consume_next_impl(std::string_view data, system::error_code& ec)
|
||||
consume_result multiplexer::consume_impl(system::error_code& ec)
|
||||
{
|
||||
// We arrive here in two states:
|
||||
//
|
||||
@@ -106,7 +114,9 @@ consume_result multiplexer::consume_next_impl(std::string_view data, system::err
|
||||
// whether the next message is a push or a response.
|
||||
//
|
||||
|
||||
auto const data = read_buffer_.get_commited();
|
||||
BOOST_ASSERT(!data.empty());
|
||||
|
||||
if (!on_push_) // Prepare for new message.
|
||||
on_push_ = is_next_push(data);
|
||||
|
||||
@@ -141,13 +151,12 @@ consume_result multiplexer::consume_next_impl(std::string_view data, system::err
|
||||
return consume_result::got_response;
|
||||
}
|
||||
|
||||
std::pair<consume_result, std::size_t> multiplexer::consume_next(
|
||||
std::string_view data,
|
||||
system::error_code& ec)
|
||||
std::pair<consume_result, std::size_t>
|
||||
multiplexer::consume(system::error_code& ec)
|
||||
{
|
||||
BOOST_ASSERT(!cancel_run_called_);
|
||||
|
||||
auto const ret = consume_next_impl(data, ec);
|
||||
auto const ret = consume_impl(ec);
|
||||
auto const consumed = parser_.get_consumed();
|
||||
if (ec) {
|
||||
return std::make_pair(ret, consumed);
|
||||
@@ -155,15 +164,37 @@ std::pair<consume_result, std::size_t> multiplexer::consume_next(
|
||||
|
||||
if (ret != consume_result::needs_more) {
|
||||
parser_.reset();
|
||||
commit_usage(ret == consume_result::got_push, consumed);
|
||||
return std::make_pair(ret, consumed);
|
||||
auto const res = read_buffer_.consume(consumed);
|
||||
commit_usage(ret == consume_result::got_push, res);
|
||||
return std::make_pair(ret, res.consumed);
|
||||
}
|
||||
|
||||
return std::make_pair(consume_result::needs_more, consumed);
|
||||
}
|
||||
|
||||
auto multiplexer::prepare_read() noexcept -> system::error_code
|
||||
{
|
||||
return read_buffer_.prepare();
|
||||
}
|
||||
|
||||
auto multiplexer::get_prepared_read_buffer() noexcept -> read_buffer::span_type
|
||||
{
|
||||
return read_buffer_.get_prepared();
|
||||
}
|
||||
|
||||
void multiplexer::commit_read(std::size_t bytes_read)
|
||||
{
|
||||
read_buffer_.commit(bytes_read);
|
||||
}
|
||||
|
||||
auto multiplexer::get_read_buffer_size() const noexcept -> std::size_t
|
||||
{
|
||||
return read_buffer_.get_commited().size();
|
||||
}
|
||||
|
||||
void multiplexer::reset()
|
||||
{
|
||||
read_buffer_.clear();
|
||||
write_buffer_.clear();
|
||||
parser_.reset();
|
||||
on_push_ = false;
|
||||
@@ -191,8 +222,6 @@ std::size_t multiplexer::prepare_write()
|
||||
usage_.commands_sent += ri->get_request().get_commands();
|
||||
});
|
||||
|
||||
usage_.bytes_sent += std::size(write_buffer_);
|
||||
|
||||
auto const d = std::distance(point, std::cend(reqs_));
|
||||
return static_cast<std::size_t>(d);
|
||||
}
|
||||
@@ -252,16 +281,18 @@ void multiplexer::cancel_on_conn_lost()
|
||||
});
|
||||
}
|
||||
|
||||
void multiplexer::commit_usage(bool is_push, std::size_t size)
|
||||
void multiplexer::commit_usage(bool is_push, read_buffer::consume_result res)
|
||||
{
|
||||
if (is_push) {
|
||||
usage_.pushes_received += 1;
|
||||
usage_.push_bytes_received += size;
|
||||
usage_.push_bytes_received += res.consumed;
|
||||
on_push_ = false;
|
||||
} else {
|
||||
usage_.responses_received += 1;
|
||||
usage_.response_bytes_received += size;
|
||||
usage_.response_bytes_received += res.consumed;
|
||||
}
|
||||
|
||||
usage_.bytes_rotated += res.rotated;
|
||||
}
|
||||
|
||||
bool multiplexer::is_next_push(std::string_view data) const noexcept
|
||||
@@ -325,6 +356,11 @@ void multiplexer::set_receive_adapter(any_adapter adapter)
|
||||
receive_adapter_ = std::move(adapter);
|
||||
}
|
||||
|
||||
void multiplexer::set_config(config const& cfg)
|
||||
{
|
||||
read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size});
|
||||
}
|
||||
|
||||
auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>
|
||||
{
|
||||
return std::make_shared<multiplexer::elem>(req, std::move(adapter));
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
system::error_code read_buffer::prepare_append()
|
||||
system::error_code read_buffer::prepare()
|
||||
{
|
||||
BOOST_ASSERT(append_buf_begin_ == buffer_.size());
|
||||
|
||||
@@ -27,34 +27,32 @@ system::error_code read_buffer::prepare_append()
|
||||
return {};
|
||||
}
|
||||
|
||||
void read_buffer::commit_append(std::size_t read_size)
|
||||
void read_buffer::commit(std::size_t read_size)
|
||||
{
|
||||
BOOST_ASSERT(buffer_.size() >= (append_buf_begin_ + read_size));
|
||||
buffer_.resize(append_buf_begin_ + read_size);
|
||||
append_buf_begin_ = buffer_.size();
|
||||
}
|
||||
|
||||
auto read_buffer::get_append_buffer() noexcept -> span_type
|
||||
auto read_buffer::get_prepared() noexcept -> span_type
|
||||
{
|
||||
auto const size = buffer_.size();
|
||||
return make_span(buffer_.data() + append_buf_begin_, size - append_buf_begin_);
|
||||
}
|
||||
|
||||
auto read_buffer::get_committed_buffer() const noexcept -> std::string_view
|
||||
auto read_buffer::get_commited() const noexcept -> std::string_view
|
||||
{
|
||||
BOOST_ASSERT(!buffer_.empty());
|
||||
return {buffer_.data(), append_buf_begin_};
|
||||
}
|
||||
|
||||
auto read_buffer::get_committed_size() const noexcept -> std::size_t { return append_buf_begin_; }
|
||||
|
||||
void read_buffer::clear()
|
||||
{
|
||||
buffer_.clear();
|
||||
append_buf_begin_ = 0;
|
||||
}
|
||||
|
||||
std::size_t read_buffer::consume_committed(std::size_t size)
|
||||
read_buffer::consume_result
|
||||
read_buffer::consume(std::size_t size)
|
||||
{
|
||||
// For convenience, if the requested size is larger than the
|
||||
// committed buffer we cap it to the maximum.
|
||||
@@ -62,9 +60,12 @@ std::size_t read_buffer::consume_committed(std::size_t size)
|
||||
size = append_buf_begin_;
|
||||
|
||||
buffer_.erase(buffer_.begin(), buffer_.begin() + size);
|
||||
auto const rotated = size == 0u ? 0u : buffer_.size();
|
||||
|
||||
BOOST_ASSERT(append_buf_begin_ >= size);
|
||||
append_buf_begin_ -= size;
|
||||
return size;
|
||||
|
||||
return {size, rotated};
|
||||
}
|
||||
|
||||
void read_buffer::reserve(std::size_t n) { buffer_.reserve(n); }
|
||||
|
||||
@@ -6,14 +6,12 @@
|
||||
|
||||
#include <boost/redis/detail/coroutine.hpp>
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
#include <boost/redis/detail/reader_fsm.hpp>
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
reader_fsm::reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept
|
||||
: read_buffer_{&rbuf}
|
||||
, mpx_{&mpx}
|
||||
reader_fsm::reader_fsm(multiplexer& mpx) noexcept
|
||||
: mpx_{&mpx}
|
||||
{ }
|
||||
|
||||
reader_fsm::action reader_fsm::resume(
|
||||
@@ -26,7 +24,7 @@ reader_fsm::action reader_fsm::resume(
|
||||
BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation)
|
||||
|
||||
for (;;) {
|
||||
ec = read_buffer_->prepare_append();
|
||||
ec = mpx_->prepare_read();
|
||||
if (ec) {
|
||||
action_after_resume_ = {action::type::done, 0, ec};
|
||||
BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run)
|
||||
@@ -34,7 +32,7 @@ reader_fsm::action reader_fsm::resume(
|
||||
}
|
||||
|
||||
BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_)
|
||||
read_buffer_->commit_append(bytes_read);
|
||||
mpx_->commit_read(bytes_read);
|
||||
if (ec) {
|
||||
// TODO: If an error occurred but data was read (i.e.
|
||||
// bytes_read != 0) we should try to process that data and
|
||||
@@ -44,9 +42,9 @@ reader_fsm::action reader_fsm::resume(
|
||||
return action_after_resume_;
|
||||
}
|
||||
|
||||
next_read_type_ = action::type::append_some;
|
||||
while (read_buffer_->get_committed_size() != 0) {
|
||||
res_ = mpx_->consume_next(read_buffer_->get_committed_buffer(), ec);
|
||||
next_read_type_ = action::type::read_some;
|
||||
while (mpx_->get_read_buffer_size() != 0) {
|
||||
res_ = mpx_->consume(ec);
|
||||
if (ec) {
|
||||
// TODO: Perhaps log what has not been consumed to aid
|
||||
// debugging.
|
||||
@@ -60,8 +58,6 @@ reader_fsm::action reader_fsm::resume(
|
||||
break;
|
||||
}
|
||||
|
||||
read_buffer_->consume_committed(res_.second);
|
||||
|
||||
if (res_.first == consume_result::got_push) {
|
||||
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
|
||||
if (ec) {
|
||||
|
||||
@@ -36,6 +36,9 @@ struct usage {
|
||||
|
||||
/// Number of push-bytes received.
|
||||
std::size_t push_bytes_received = 0;
|
||||
|
||||
/// Number of bytes rotated in the read buffer.
|
||||
std::size_t bytes_rotated = 0;
|
||||
};
|
||||
|
||||
} // namespace boost::redis
|
||||
|
||||
@@ -15,7 +15,7 @@ target_compile_features(boost_redis_src PRIVATE cxx_std_17)
|
||||
target_link_libraries(boost_redis_src PRIVATE boost_redis_project_options)
|
||||
|
||||
# Test utils
|
||||
add_library(boost_redis_tests_common STATIC common.cpp)
|
||||
add_library(boost_redis_tests_common STATIC common.cpp sansio_utils.cpp)
|
||||
target_compile_features(boost_redis_tests_common PRIVATE cxx_std_17)
|
||||
target_link_libraries(boost_redis_tests_common PRIVATE boost_redis_project_options)
|
||||
|
||||
|
||||
@@ -42,6 +42,7 @@ lib redis_test_common
|
||||
:
|
||||
boost_redis.cpp
|
||||
common.cpp
|
||||
sansio_utils.cpp
|
||||
: requirements $(requirements)
|
||||
: usage-requirements $(requirements)
|
||||
;
|
||||
|
||||
@@ -68,10 +68,3 @@ void run_coroutine_test(net::awaitable<void> op, std::chrono::steady_clock::dura
|
||||
throw std::runtime_error("Coroutine test did not finish");
|
||||
}
|
||||
#endif // BOOST_ASIO_HAS_CO_AWAIT
|
||||
|
||||
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data)
|
||||
{
|
||||
auto const buffer = rbuf.get_append_buffer();
|
||||
BOOST_ASSERT(data.size() <= buffer.size());
|
||||
std::copy(data.begin(), data.end(), buffer.begin());
|
||||
}
|
||||
|
||||
@@ -35,5 +35,3 @@ void run(
|
||||
boost::redis::config cfg = make_test_config(),
|
||||
boost::system::error_code ec = boost::asio::error::operation_aborted,
|
||||
boost::redis::operation op = boost::redis::operation::receive);
|
||||
|
||||
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data);
|
||||
|
||||
25
test/sansio_utils.cpp
Normal file
25
test/sansio_utils.cpp
Normal file
@@ -0,0 +1,25 @@
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include "sansio_utils.hpp"
|
||||
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
void read(multiplexer& mpx, std::string_view data)
|
||||
{
|
||||
auto const ec = mpx.prepare_read();
|
||||
ignore_unused(ec);
|
||||
BOOST_ASSERT(ec == system::error_code{});
|
||||
auto const buffer = mpx.get_prepared_read_buffer();
|
||||
BOOST_ASSERT(buffer.size() >= data.size());
|
||||
std::copy(data.cbegin(), data.cend(), buffer.begin());
|
||||
mpx.commit_read(data.size());
|
||||
}
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
28
test/sansio_utils.hpp
Normal file
28
test/sansio_utils.hpp
Normal file
@@ -0,0 +1,28 @@
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#ifndef BOOST_REDIS_TEST_SANSIO_UTILS_HPP
|
||||
#define BOOST_REDIS_TEST_SANSIO_UTILS_HPP
|
||||
|
||||
#include <string_view>
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
class multiplexer;
|
||||
|
||||
// Read data into the multiplexer with the following steps
|
||||
//
|
||||
// 1. prepare_read
|
||||
// 2. get_read_buffer
|
||||
// 3. Copy data in the buffer from 2.
|
||||
// 4. commit_read;
|
||||
//
|
||||
// This is used in the multiplexer tests.
|
||||
void read(multiplexer& mpx, std::string_view data);
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
|
||||
#endif // BOOST_REDIS_TEST_SANSIO_UTILS_HPP
|
||||
@@ -43,8 +43,9 @@ std::ostream& operator<<(std::ostream& os, usage const& u)
|
||||
<< "Bytes sent: " << u.bytes_sent << "\n"
|
||||
<< "Responses received: " << u.responses_received << "\n"
|
||||
<< "Pushes received: " << u.pushes_received << "\n"
|
||||
<< "Response bytes received: " << u.response_bytes_received << "\n"
|
||||
<< "Push bytes received: " << u.push_bytes_received;
|
||||
<< "Bytes received (response): " << u.response_bytes_received << "\n"
|
||||
<< "Bytes received (push): " << u.push_bytes_received << "\n"
|
||||
<< "Bytes rotated: " << u.bytes_rotated;
|
||||
|
||||
return os;
|
||||
}
|
||||
|
||||
@@ -14,12 +14,15 @@
|
||||
#include <boost/asio/error.hpp>
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
#include <boost/assert.hpp>
|
||||
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <ostream>
|
||||
#include <utility>
|
||||
|
||||
#include "sansio_utils.hpp"
|
||||
|
||||
using namespace boost::redis;
|
||||
namespace asio = boost::asio;
|
||||
using detail::exec_fsm;
|
||||
@@ -125,7 +128,8 @@ void test_success()
|
||||
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
|
||||
|
||||
// Simulate a successful read
|
||||
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
|
||||
read(mpx, "$5\r\nhello\r\n");
|
||||
auto req_status = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
|
||||
@@ -166,7 +170,8 @@ void test_parse_error()
|
||||
// The second field should be a number (rather than the empty string).
|
||||
// Note that although part of the buffer was consumed, the multiplexer
|
||||
// currently throws this information away.
|
||||
auto req_status = mpx.consume_next("*2\r\n$5\r\nhello\r\n:\r\n", ec);
|
||||
read(mpx, "*2\r\n$5\r\nhello\r\n:\r\n");
|
||||
auto req_status = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ec, error::empty_field);
|
||||
BOOST_TEST_EQ(req_status.second, 15u);
|
||||
BOOST_TEST_EQ(input.done_calls, 1u);
|
||||
@@ -224,7 +229,8 @@ void test_not_connected()
|
||||
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
|
||||
|
||||
// Simulate a successful read
|
||||
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
|
||||
read(mpx, "$5\r\nhello\r\n");
|
||||
auto req_status = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
|
||||
@@ -317,9 +323,10 @@ void test_cancel_notwaiting_terminal_partial()
|
||||
BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted));
|
||||
input.reset(); // Verify we don't access the request or response after completion
|
||||
|
||||
// When the response to this request arrives, it gets ignored
|
||||
error_code ec;
|
||||
auto res = mpx.consume_next("-ERR wrong command\r\n", ec);
|
||||
// When the response to this request arrives, it gets ignored
|
||||
read(mpx, "-ERR wrong command\r\n");
|
||||
auto res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ_MSG(ec, error_code(), tc.name);
|
||||
BOOST_TEST_EQ_MSG(res.first, consume_result::got_response, tc.name);
|
||||
|
||||
@@ -355,7 +362,8 @@ void test_cancel_notwaiting_total()
|
||||
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
|
||||
|
||||
// Simulate a successful read
|
||||
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
|
||||
read(mpx, "$5\r\nhello\r\n");
|
||||
auto req_status = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
|
||||
|
||||
@@ -197,22 +197,22 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_error)
|
||||
|
||||
// Usual case, max size is bigger then requested size.
|
||||
buf.set_config({10, 10});
|
||||
auto ec = buf.prepare_append();
|
||||
auto ec = buf.prepare();
|
||||
BOOST_TEST(!ec);
|
||||
buf.commit_append(10);
|
||||
buf.commit(10);
|
||||
|
||||
// Corner case, max size is equal to the requested size.
|
||||
buf.set_config({10, 20});
|
||||
ec = buf.prepare_append();
|
||||
ec = buf.prepare();
|
||||
BOOST_TEST(!ec);
|
||||
buf.commit_append(10);
|
||||
buf.consume_committed(20);
|
||||
buf.commit(10);
|
||||
buf.consume(20);
|
||||
|
||||
auto const tmp = buf;
|
||||
|
||||
// Error case, max size is smaller to the requested size.
|
||||
buf.set_config({10, 9});
|
||||
ec = buf.prepare_append();
|
||||
ec = buf.prepare();
|
||||
BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size});
|
||||
|
||||
// Check that an error call has no side effects.
|
||||
@@ -227,19 +227,34 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data)
|
||||
read_buffer buf;
|
||||
|
||||
buf.set_config({10, 10});
|
||||
auto ec = buf.prepare_append();
|
||||
auto ec = buf.prepare();
|
||||
BOOST_TEST(!ec);
|
||||
|
||||
auto res = buf.consume(5);
|
||||
|
||||
// No data has been committed yet so nothing can be consummed.
|
||||
BOOST_CHECK_EQUAL(buf.consume_committed(5), 0u);
|
||||
BOOST_CHECK_EQUAL(res.consumed, 0u);
|
||||
|
||||
buf.commit_append(10);
|
||||
// If nothing was consumed, nothing got rotated.
|
||||
BOOST_CHECK_EQUAL(res.rotated, 0u);
|
||||
|
||||
// All five bytes can be consumed.
|
||||
BOOST_CHECK_EQUAL(buf.consume_committed(5), 5u);
|
||||
buf.commit(10);
|
||||
res = buf.consume(5);
|
||||
|
||||
// All five bytes should have been consumed.
|
||||
BOOST_CHECK_EQUAL(res.consumed, 5u);
|
||||
|
||||
// We added a total of 10 bytes and consumed 5, that means, 5 were
|
||||
// rotated.
|
||||
BOOST_CHECK_EQUAL(res.rotated, 5u);
|
||||
|
||||
res = buf.consume(7);
|
||||
|
||||
// Only the remaining five bytes can be consumed
|
||||
BOOST_CHECK_EQUAL(buf.consume_committed(7), 5u);
|
||||
BOOST_CHECK_EQUAL(res.consumed, 5u);
|
||||
|
||||
// No bytes to rotated.
|
||||
BOOST_CHECK_EQUAL(res.rotated, 0u);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
|
||||
@@ -249,10 +264,10 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
|
||||
read_buffer buf;
|
||||
|
||||
buf.set_config({10, 10});
|
||||
auto ec = buf.prepare_append();
|
||||
auto ec = buf.prepare();
|
||||
BOOST_TEST(!ec);
|
||||
|
||||
BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u);
|
||||
BOOST_CHECK_EQUAL(buf.get_prepared().size(), 10u);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(check_counter_adapter)
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
#include "sansio_utils.hpp"
|
||||
|
||||
using boost::redis::request;
|
||||
using boost::redis::detail::multiplexer;
|
||||
using boost::redis::detail::consume_result;
|
||||
@@ -122,11 +124,13 @@ void test_request_needs_more()
|
||||
|
||||
// Parse part of the response
|
||||
error_code ec;
|
||||
auto ret = mpx.consume_next("$11\r\nhello", ec);
|
||||
read(mpx, "$11\r\nhello");
|
||||
auto ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
|
||||
|
||||
// Parse the rest of it
|
||||
ret = mpx.consume_next("$11\r\nhello world\r\n", ec);
|
||||
read(mpx, " world\r\n");
|
||||
ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_response);
|
||||
const node expected[] = {
|
||||
{type::blob_string, 1u, 0u, "hello world"},
|
||||
@@ -191,7 +195,8 @@ void test_several_requests()
|
||||
|
||||
// Consumes the next message in the read buffer.
|
||||
error_code ec;
|
||||
auto ret = mpx.consume_next("+one\r\n", ec);
|
||||
read(mpx, "+one\r\n");
|
||||
auto ret = mpx.consume(ec);
|
||||
|
||||
// The read operation should have been successful.
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_response);
|
||||
@@ -204,7 +209,8 @@ void test_several_requests()
|
||||
|
||||
// Consumes the second message in the read buffer
|
||||
// Consumes the next message in the read buffer.
|
||||
ret = mpx.consume_next("+two\r\n", ec);
|
||||
read(mpx, "+two\r\n");
|
||||
ret = mpx.consume(ec);
|
||||
|
||||
// The read operation should have been successful.
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_response);
|
||||
@@ -234,7 +240,8 @@ void test_request_response_before_write()
|
||||
// The response is received. The request is marked as done,
|
||||
// even if the write hasn't been confirmed yet
|
||||
error_code ec;
|
||||
auto ret = mpx.consume_next("+one\r\n", ec);
|
||||
read(mpx, "+one\r\n");
|
||||
auto ret = mpx.consume(ec);
|
||||
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
@@ -256,7 +263,8 @@ void test_push()
|
||||
|
||||
// Consume an entire push
|
||||
error_code ec;
|
||||
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
|
||||
read(mpx, ">2\r\n+one\r\n+two\r\n");
|
||||
auto const ret = mpx.consume(ec);
|
||||
|
||||
// Check
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_push);
|
||||
@@ -275,20 +283,17 @@ void test_push_needs_more()
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
std::string msg;
|
||||
|
||||
// Only part of the message available.
|
||||
msg += ">2\r\n+one\r";
|
||||
|
||||
// Consume it
|
||||
// Consume it only part of the message available.
|
||||
error_code ec;
|
||||
auto ret = mpx.consume_next(msg, ec);
|
||||
read(mpx, ">2\r\n+one\r");
|
||||
auto ret = mpx.consume(ec);
|
||||
|
||||
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
|
||||
|
||||
// The entire message becomes available
|
||||
msg += "\n+two\r\n";
|
||||
ret = mpx.consume_next(msg, ec);
|
||||
read(mpx, "\n+two\r\n");
|
||||
ret = mpx.consume(ec);
|
||||
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_push);
|
||||
BOOST_TEST_EQ(ret.second, 16u);
|
||||
@@ -311,7 +316,8 @@ void test_push_heuristics_no_request()
|
||||
|
||||
// Response received, but no request has been sent
|
||||
error_code ec;
|
||||
auto const ret = mpx.consume_next("+Hello world\r\n", ec);
|
||||
read(mpx, "+Hello world\r\n");
|
||||
auto const ret = mpx.consume(ec);
|
||||
|
||||
// Check
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_push);
|
||||
@@ -337,7 +343,8 @@ void test_push_heuristics_request_waiting()
|
||||
|
||||
// Response received, but no request has been sent
|
||||
error_code ec;
|
||||
auto const ret = mpx.consume_next("+Hello world\r\n", ec);
|
||||
read(mpx, "+Hello world\r\n");
|
||||
auto const ret = mpx.consume(ec);
|
||||
|
||||
// Check
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_push);
|
||||
@@ -366,7 +373,8 @@ void test_push_heuristics_request_without_response()
|
||||
|
||||
// Response received (e.g. syntax error)
|
||||
error_code ec;
|
||||
auto const ret = mpx.consume_next("-ERR wrong syntax\r\n", ec);
|
||||
read(mpx, "-ERR wrong syntax\r\n");
|
||||
auto const ret = mpx.consume(ec);
|
||||
|
||||
// Check
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_push);
|
||||
@@ -396,7 +404,8 @@ void test_mix_responses_pushes()
|
||||
// Push
|
||||
std::string_view push1_buffer = ">2\r\n+one\r\n+two\r\n";
|
||||
error_code ec;
|
||||
auto ret = mpx.consume_next(push1_buffer, ec);
|
||||
read(mpx, push1_buffer);
|
||||
auto ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_push);
|
||||
BOOST_TEST_EQ(ret.second, 16u);
|
||||
std::vector<node> expected{
|
||||
@@ -410,7 +419,8 @@ void test_mix_responses_pushes()
|
||||
|
||||
// First response
|
||||
std::string_view response1_buffer = "$11\r\nHello world\r\n";
|
||||
ret = mpx.consume_next(response1_buffer, ec);
|
||||
read(mpx, response1_buffer);
|
||||
ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(ret.second, 18u);
|
||||
expected = {
|
||||
@@ -422,7 +432,8 @@ void test_mix_responses_pushes()
|
||||
|
||||
// Push
|
||||
std::string_view push2_buffer = ">2\r\n+other\r\n+push\r\n";
|
||||
ret = mpx.consume_next(push2_buffer, ec);
|
||||
read(mpx, push2_buffer);
|
||||
ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_push);
|
||||
BOOST_TEST_EQ(ret.second, 19u);
|
||||
expected = {
|
||||
@@ -439,7 +450,8 @@ void test_mix_responses_pushes()
|
||||
|
||||
// Second response
|
||||
std::string_view response2_buffer = "$8\r\nResponse\r\n";
|
||||
ret = mpx.consume_next(response2_buffer, ec);
|
||||
read(mpx, response2_buffer);
|
||||
ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(ret.second, 14u);
|
||||
expected = {
|
||||
@@ -478,7 +490,8 @@ void test_cancel_waiting()
|
||||
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
|
||||
BOOST_TEST_EQ(mpx.commit_write(), 0u);
|
||||
error_code ec;
|
||||
auto res = mpx.consume_next("$11\r\nHello world\r\n", ec);
|
||||
read(mpx, "$11\r\nHello world\r\n");
|
||||
auto res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST(item2->done);
|
||||
const node expected[] = {
|
||||
@@ -509,12 +522,14 @@ void test_cancel_staged()
|
||||
|
||||
// The cancelled request's response arrives. It gets discarded
|
||||
error_code ec;
|
||||
auto res = mpx.consume_next("+Goodbye\r\n", ec);
|
||||
read(mpx, "+Goodbye\r\n");
|
||||
auto res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_NOT(item2->done);
|
||||
|
||||
// The 2nd request's response arrives. It gets parsed successfully
|
||||
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
|
||||
read(mpx, "$11\r\nHello world\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST(item2->done);
|
||||
const node expected[] = {
|
||||
@@ -545,7 +560,8 @@ void test_cancel_staged_command_without_response()
|
||||
|
||||
// The 2nd request's response arrives. It gets parsed successfully
|
||||
error_code ec;
|
||||
auto res = mpx.consume_next("$11\r\nHello world\r\n", ec);
|
||||
read(mpx, "$11\r\nHello world\r\n");
|
||||
auto res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST(item2->done);
|
||||
const node expected[] = {
|
||||
@@ -574,12 +590,14 @@ void test_cancel_written()
|
||||
|
||||
// The cancelled request's response arrives. It gets discarded
|
||||
error_code ec;
|
||||
auto res = mpx.consume_next("+Goodbye\r\n", ec);
|
||||
read(mpx, "+Goodbye\r\n");
|
||||
auto res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_NOT(item2->done);
|
||||
|
||||
// The 2nd request's response arrives. It gets parsed successfully
|
||||
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
|
||||
read(mpx, "$11\r\nHello world\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST(item2->done);
|
||||
const node expected[] = {
|
||||
@@ -609,13 +627,15 @@ void test_cancel_written_half_parsed_response()
|
||||
|
||||
// Get the response for the 1st command in req1
|
||||
error_code ec;
|
||||
auto res = mpx.consume_next("+Goodbye\r\n", ec);
|
||||
read(mpx, "+Goodbye\r\n");
|
||||
auto res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_NOT(item1->done);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
|
||||
// Get a partial response for the 2nd command in req1
|
||||
res = mpx.consume_next("*2\r\n$4\r\nsome\r\n", ec);
|
||||
read(mpx, "*2\r\n$4\r\nsome\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::needs_more);
|
||||
BOOST_TEST_NOT(item1->done);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
@@ -625,19 +645,22 @@ void test_cancel_written_half_parsed_response()
|
||||
item1.reset(); // Verify we don't reference this item anyhow
|
||||
|
||||
// Get the rest of the response for the 2nd command in req1
|
||||
res = mpx.consume_next("*2\r\n$4\r\nsome\r\n$4\r\ndata\r\n", ec);
|
||||
read(mpx, "*2\r\n$4\r\nsome\r\n$4\r\ndata\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_NOT(item2->done);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
|
||||
// Get the response for the 3rd command in req1
|
||||
res = mpx.consume_next("+last\r\n", ec);
|
||||
read(mpx, "+last\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_NOT(item2->done);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
|
||||
// Get the response for the 2nd request
|
||||
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
|
||||
read(mpx, "$11\r\nHello world\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST(item2->done);
|
||||
const node expected[] = {
|
||||
@@ -672,23 +695,27 @@ void test_cancel_written_null_error()
|
||||
// The cancelled request's response arrives. It contains NULLs and errors.
|
||||
// We ignore them
|
||||
error_code ec;
|
||||
auto res = mpx.consume_next("-ERR wrong command\r\n", ec);
|
||||
read(mpx, "-ERR wrong command\r\n");
|
||||
auto res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_NOT(item2->done);
|
||||
|
||||
res = mpx.consume_next("!3\r\nBad\r\n", ec);
|
||||
read(mpx, "!3\r\nBad\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_NOT(item2->done);
|
||||
|
||||
res = mpx.consume_next("_\r\n", ec);
|
||||
read(mpx, "_\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_NOT(item2->done);
|
||||
|
||||
// The 2nd request's response arrives. It gets parsed successfully
|
||||
res = mpx.consume_next("$11\r\nHello world\r\n", ec);
|
||||
read(mpx, "$11\r\nHello world\r\n");
|
||||
res = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
BOOST_TEST(item2->done);
|
||||
const node expected[] = {
|
||||
@@ -812,7 +839,8 @@ void test_cancel_on_connection_lost_abandoned()
|
||||
// mpx.add(item.elem_ptr);
|
||||
// BOOST_TEST_EQ(mpx.prepare_write(), 1u);
|
||||
// BOOST_TEST_EQ(mpx.commit_write(), 0u);
|
||||
// auto res = mpx.consume_next("*2\r\n+hello\r\n", ec);
|
||||
// read(mpx, "*2\r\n+hello\r\n");
|
||||
// auto res = mpx.consume(ec);
|
||||
// BOOST_TEST_EQ(res.first, consume_result::needs_more);
|
||||
// BOOST_TEST_EQ(ec, error_code());
|
||||
|
||||
@@ -827,7 +855,8 @@ void test_cancel_on_connection_lost_abandoned()
|
||||
// // Successful write, and this time the response is complete
|
||||
// BOOST_TEST_EQ(mpx.prepare_write(), 1u);
|
||||
// BOOST_TEST_EQ(mpx.commit_write(), 0u);
|
||||
// res = mpx.consume_next("*2\r\n+hello\r\n+world\r\n", ec);
|
||||
// read(mpx, "*2\r\n+hello\r\n+world\r\n");
|
||||
// res = mpx.consume(ec);
|
||||
// BOOST_TEST_EQ(res.first, consume_result::got_response);
|
||||
// BOOST_TEST_EQ(ec, error_code());
|
||||
|
||||
@@ -854,7 +883,8 @@ void test_reset()
|
||||
|
||||
// Start parsing a push
|
||||
error_code ec;
|
||||
auto ret = mpx.consume_next(">2\r", ec);
|
||||
read(mpx, ">2\r");
|
||||
auto ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
|
||||
|
||||
// Connection lost. The first request gets cancelled
|
||||
@@ -871,7 +901,8 @@ void test_reset()
|
||||
BOOST_TEST_EQ(mpx.commit_write(), 0u);
|
||||
|
||||
std::string_view response_buffer = "$11\r\nHello world\r\n";
|
||||
ret = mpx.consume_next(response_buffer, ec);
|
||||
read(mpx, response_buffer);
|
||||
ret = mpx.consume(ec);
|
||||
BOOST_TEST_EQ(ret.first, consume_result::got_response);
|
||||
BOOST_TEST_EQ(ret.second, response_buffer.size());
|
||||
const node expected[] = {
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
#include <boost/redis/detail/reader_fsm.hpp>
|
||||
|
||||
#include <boost/asio/cancellation_type.hpp>
|
||||
@@ -21,9 +20,9 @@ using boost::system::error_code;
|
||||
using net::cancellation_type_t;
|
||||
using redis::detail::reader_fsm;
|
||||
using redis::detail::multiplexer;
|
||||
using redis::detail::read_buffer;
|
||||
using redis::generic_response;
|
||||
using redis::any_adapter;
|
||||
using redis::config;
|
||||
using action = redis::detail::reader_fsm::action;
|
||||
|
||||
namespace boost::redis::detail {
|
||||
@@ -35,6 +34,20 @@ std::ostream& operator<<(std::ostream& os, reader_fsm::action::type t)
|
||||
os << to_string(t);
|
||||
return os;
|
||||
}
|
||||
|
||||
// Copy data into the multiplexer with the following steps
|
||||
//
|
||||
// 1. get_read_buffer
|
||||
// 2. Copy data in the buffer from 2.
|
||||
//
|
||||
// This is used in the reader_fsm tests.
|
||||
void copy_to(multiplexer& mpx, std::string_view data)
|
||||
{
|
||||
auto const buffer = mpx.get_prepared_read_buffer();
|
||||
BOOST_ASSERT(buffer.size() >= data.size());
|
||||
std::copy(data.cbegin(), data.cend(), buffer.begin());
|
||||
}
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
|
||||
// Operators
|
||||
@@ -42,11 +55,10 @@ namespace {
|
||||
|
||||
void test_push()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
reader_fsm fsm{mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -54,7 +66,7 @@ void test_push()
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
std::string const payload =
|
||||
@@ -62,7 +74,7 @@ void test_push()
|
||||
">1\r\n+msg2 \r\n"
|
||||
">1\r\n+msg3 \r\n";
|
||||
|
||||
append_read_data(rbuf, payload);
|
||||
copy_to(mpx, payload);
|
||||
|
||||
// Deliver the 1st push
|
||||
act = fsm.resume(payload.size(), ec, cancellation_type_t::none);
|
||||
@@ -84,17 +96,16 @@ void test_push()
|
||||
|
||||
// All pushes were delivered so the fsm should demand more data
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
}
|
||||
|
||||
void test_read_needs_more()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
reader_fsm fsm{mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -102,27 +113,27 @@ void test_read_needs_more()
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
|
||||
// Split the incoming message in three random parts and deliver
|
||||
// them to the reader individually.
|
||||
std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"};
|
||||
|
||||
// Passes the first part to the fsm.
|
||||
append_read_data(rbuf, msg[0]);
|
||||
copy_to(mpx, msg[0]);
|
||||
act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::needs_more);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
// Passes the second part to the fsm.
|
||||
append_read_data(rbuf, msg[1]);
|
||||
copy_to(mpx, msg[1]);
|
||||
act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::needs_more);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
// Passes the third and last part to the fsm, next it should ask us
|
||||
// to deliver the message.
|
||||
append_read_data(rbuf, msg[2]);
|
||||
copy_to(mpx, msg[2]);
|
||||
act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
|
||||
BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size());
|
||||
@@ -130,17 +141,16 @@ void test_read_needs_more()
|
||||
|
||||
// All pushes were delivered so the fsm should demand more data
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
}
|
||||
|
||||
void test_read_error()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
reader_fsm fsm{mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -148,11 +158,11 @@ void test_read_error()
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
std::string const payload = ">1\r\n+msg1\r\n";
|
||||
append_read_data(rbuf, payload);
|
||||
copy_to(mpx, payload);
|
||||
|
||||
// Deliver the data
|
||||
act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none);
|
||||
@@ -167,11 +177,10 @@ void test_read_error()
|
||||
|
||||
void test_parse_error()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
reader_fsm fsm{mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -179,11 +188,11 @@ void test_parse_error()
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
std::string const payload = ">a\r\n";
|
||||
append_read_data(rbuf, payload);
|
||||
copy_to(mpx, payload);
|
||||
|
||||
// Deliver the data
|
||||
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
|
||||
@@ -198,11 +207,10 @@ void test_parse_error()
|
||||
|
||||
void test_push_deliver_error()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
reader_fsm fsm{mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -210,11 +218,11 @@ void test_push_deliver_error()
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
std::string const payload = ">1\r\n+msg1\r\n";
|
||||
append_read_data(rbuf, payload);
|
||||
copy_to(mpx, payload);
|
||||
|
||||
// Deliver the data
|
||||
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
|
||||
@@ -233,12 +241,15 @@ void test_push_deliver_error()
|
||||
|
||||
void test_max_read_buffer_size()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
rbuf.set_config({5, 7});
|
||||
config cfg;
|
||||
cfg.read_buffer_append_size = 5;
|
||||
cfg.max_read_size = 7;
|
||||
|
||||
multiplexer mpx;
|
||||
mpx.set_config(cfg);
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
reader_fsm fsm{mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -246,11 +257,11 @@ void test_max_read_buffer_size()
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
BOOST_TEST_EQ(act.type_, action::type::read_some);
|
||||
|
||||
// Passes the first part to the fsm.
|
||||
std::string const part1 = ">3\r\n";
|
||||
append_read_data(rbuf, part1);
|
||||
copy_to(mpx, part1);
|
||||
act = fsm.resume(part1.size(), {}, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
Reference in New Issue
Block a user