2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Add parse event init, node and done.

This commit is contained in:
Marcelo Zimbres
2025-07-21 22:42:55 +02:00
parent 88d8f3c0ca
commit 16bf57cf33
22 changed files with 332 additions and 228 deletions

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -19,6 +19,18 @@
namespace boost::redis {
/** @brief Parse events that an adapter must support.
*/
enum class parse_event
{
/// Called before the parser starts processing data
init,
/// Called for each and every node of RESP3 data
node,
/// Called when done processing a complete RESP3 message
done
};
/** @brief A type-erased reference to a response.
*
* A type-erased response adapter. It can be executed using @ref connection::async_exec.
@@ -29,46 +41,63 @@ namespace boost::redis {
*
* @code
* co_await conn.async_exec(req, resp);
* co_await conn.async_exec(req, any_response(resp));
* co_await conn.async_exec(req, any_adapter(...));
* @endcode
*/
class any_adapter {
using any_adapter = std::function<void(parse_event, resp3::node_view const&, system::error_code&)>;
namespace detail {
template <class T>
auto make_any_adapter(T& resp) -> any_adapter
{
using namespace boost::redis::adapter;
return [adapter = boost_redis_adapt(
resp)](parse_event ev, resp3::node_view const& nd, system::error_code& ec) mutable {
switch (ev) {
case parse_event::init: adapter.on_init(); break;
case parse_event::node: adapter.on_node(nd, ec); break;
case parse_event::done: adapter.on_done(); break;
}
};
}
class any_adapter_wrapper {
public:
using fn_type = std::function<void(std::size_t, resp3::node_view const&, system::error_code&)>;
struct impl_t {
fn_type adapt_fn;
std::size_t supported_response_size;
} impl_;
template <class T>
static auto create_impl(T& resp) -> impl_t
{
using namespace boost::redis::adapter;
auto adapter = boost_redis_adapt(resp);
std::size_t size = adapter.get_supported_response_size();
return {std::move(adapter), size};
}
template <class Executor>
friend class basic_connection;
/**
* @brief Constructor.
*
* Creates a type-erased response adapter from `resp` by calling
* `boost_redis_adapt`. `T` must be a valid Redis response type.
* Any type passed to @ref connection::async_exec qualifies.
*
* This object stores a reference to `resp`, which must be kept alive
* while `*this` is being used.
*/
template <class T, class = std::enable_if_t<!std::is_same_v<T, any_adapter>>>
explicit any_adapter(T& resp)
: impl_(create_impl(resp))
any_adapter_wrapper(any_adapter adapter = {}, std::size_t expected_responses = 0u)
: adapter_{std::move(adapter)}
, expected_responses_{expected_responses}
{ }
void on_init()
{
system::error_code ec;
adapter_(parse_event::init, {}, ec);
};
void on_done()
{
system::error_code ec;
adapter_(parse_event::done, {}, ec);
BOOST_ASSERT(expected_responses_ != 0u);
expected_responses_ -= 1;
};
void on_node(resp3::node_view const& nd, system::error_code& ec)
{
adapter_(parse_event::node, nd, ec);
};
auto get_remaining_responses() const -> std::size_t { return expected_responses_; }
private:
any_adapter adapter_;
std::size_t expected_responses_ = 0;
};
} // namespace detail
} // namespace boost::redis
#endif

View File

@@ -147,8 +147,12 @@ public:
explicit general_aggregate(Result* c = nullptr)
: result_(c)
{ }
void on_init() { }
void on_done() { }
template <class String>
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
switch (nd.data_type) {
@@ -180,8 +184,11 @@ public:
: result_(t)
{ }
void on_init() { }
void on_done() { }
template <class String>
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
switch (nd.data_type) {
@@ -206,8 +213,11 @@ class simple_impl {
public:
void on_value_available(Result&) { }
void on_init() { }
void on_done() { }
template <class String>
void operator()(Result& result, resp3::basic_node<String> const& node, system::error_code& ec)
void on_node(Result& result, resp3::basic_node<String> const& node, system::error_code& ec)
{
if (is_aggregate(node.data_type)) {
ec = redis::error::expects_resp3_simple_type;
@@ -226,8 +236,11 @@ private:
public:
void on_value_available(Result& result) { hint_ = std::end(result); }
void on_init() { }
void on_done() { }
template <class String>
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
if (nd.data_type != resp3::type::set)
@@ -257,8 +270,11 @@ private:
public:
void on_value_available(Result& result) { current_ = std::end(result); }
void on_init() { }
void on_done() { }
template <class String>
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
if (element_multiplicity(nd.data_type) != 2)
@@ -292,8 +308,11 @@ class vector_impl {
public:
void on_value_available(Result&) { }
void on_init() { }
void on_done() { }
template <class String>
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
auto const m = element_multiplicity(nd.data_type);
@@ -313,8 +332,11 @@ private:
public:
void on_value_available(Result&) { }
void on_init() { }
void on_done() { }
template <class String>
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (is_aggregate(nd.data_type)) {
if (i_ != -1) {
@@ -344,8 +366,11 @@ template <class Result>
struct list_impl {
void on_value_available(Result&) { }
void on_init() { }
void on_done() { }
template <class String>
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
{
if (!is_aggregate(nd.data_type)) {
BOOST_ASSERT(nd.aggregate_size == 1);
@@ -468,8 +493,11 @@ public:
}
}
void on_init() { impl_.on_init(); }
void on_done() { impl_.on_done(); }
template <class String>
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(resp3::basic_node<String> const& nd, system::error_code& ec)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
@@ -480,7 +508,7 @@ public:
return;
BOOST_ASSERT(result_);
impl_(result_->value(), nd, ec);
impl_.on_node(result_->value(), nd, ec);
}
};
@@ -514,8 +542,11 @@ public:
: result_(o)
{ }
void on_init() { impl_.on_init(); }
void on_done() { impl_.on_done(); }
template <class String>
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(resp3::basic_node<String> const& nd, system::error_code& ec)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
@@ -533,7 +564,7 @@ public:
impl_.on_value_available(result_->value().value());
}
impl_(result_->value().value(), nd, ec);
impl_.on_node(result_->value().value(), nd, ec);
}
};

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -8,6 +8,7 @@
#define BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP
#include <boost/redis/adapter/detail/result_traits.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/response.hpp>
@@ -21,26 +22,6 @@
namespace boost::redis::adapter::detail {
class ignore_adapter {
public:
template <class String>
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
{
switch (nd.data_type) {
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
case resp3::type::blob_error: ec = redis::error::resp3_blob_error; break;
case resp3::type::null: ec = redis::error::resp3_null; break;
default: ;
}
}
[[nodiscard]]
auto get_supported_response_size() const noexcept
{
return static_cast<std::size_t>(-1);
}
};
template <class Response>
class static_adapter {
private:
@@ -50,51 +31,44 @@ private:
using adapters_array_type = std::array<variant_type, size>;
adapters_array_type adapters_;
std::size_t i_ = 0;
public:
explicit static_adapter(Response& r) { assigner<size - 1>::assign(adapters_, r); }
[[nodiscard]]
auto get_supported_response_size() const noexcept
{
return size;
}
template <class String>
void operator()(std::size_t i, resp3::basic_node<String> const& nd, system::error_code& ec)
void on_init()
{
using std::visit;
// I am usure whether this should be an error or an assertion.
BOOST_ASSERT(i < adapters_.size());
visit(
[&](auto& arg) {
arg(nd, ec);
arg.on_init();
},
adapters_.at(i));
adapters_.at(i_));
}
};
template <class Vector>
class vector_adapter {
private:
using adapter_type = typename result_traits<Vector>::adapter_type;
adapter_type adapter_;
public:
explicit vector_adapter(Vector& v)
: adapter_{internal_adapt(v)}
{ }
[[nodiscard]]
auto get_supported_response_size() const noexcept
void on_done()
{
return static_cast<std::size_t>(-1);
using std::visit;
visit(
[&](auto& arg) {
arg.on_done();
},
adapters_.at(i_));
i_ += 1;
}
template <class String>
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
void on_node(resp3::basic_node<String> const& nd, system::error_code& ec)
{
adapter_(nd, ec);
using std::visit;
// I am usure whether this should be an error or an assertion.
BOOST_ASSERT(i_ < adapters_.size());
visit(
[&](auto& arg) {
arg.on_node(nd, ec);
},
adapters_.at(i_));
}
};
@@ -104,25 +78,25 @@ struct response_traits;
template <>
struct response_traits<ignore_t> {
using response_type = ignore_t;
using adapter_type = detail::ignore_adapter;
using adapter_type = ignore;
static auto adapt(response_type&) noexcept { return detail::ignore_adapter{}; }
static auto adapt(response_type&) noexcept { return ignore{}; }
};
template <>
struct response_traits<result<ignore_t>> {
using response_type = result<ignore_t>;
using adapter_type = detail::ignore_adapter;
using adapter_type = ignore;
static auto adapt(response_type&) noexcept { return detail::ignore_adapter{}; }
static auto adapt(response_type&) noexcept { return ignore{}; }
};
template <class String, class Allocator>
struct response_traits<result<std::vector<resp3::basic_node<String>, Allocator>>> {
using response_type = result<std::vector<resp3::basic_node<String>, Allocator>>;
using adapter_type = vector_adapter<response_type>;
using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{v}; }
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <class... Ts>
@@ -133,35 +107,6 @@ struct response_traits<response<Ts...>> {
static auto adapt(response_type& r) noexcept { return adapter_type{r}; }
};
template <class Adapter>
class wrapper {
public:
explicit wrapper(Adapter adapter)
: adapter_{adapter}
{ }
template <class String>
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
{
return adapter_(0, nd, ec);
}
[[nodiscard]]
auto get_supported_response_size() const noexcept
{
return adapter_.get_supported_response_size();
}
private:
Adapter adapter_;
};
template <class Adapter>
auto make_adapter_wrapper(Adapter adapter)
{
return wrapper{adapter};
}
} // namespace boost::redis::adapter::detail
#endif // BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP

View File

@@ -132,8 +132,32 @@ public:
}
}
void on_init()
{
using std::visit;
for (auto& adapter : adapters_) {
visit(
[&](auto& arg) {
arg.on_init();
},
adapter);
}
}
void on_done()
{
using std::visit;
for (auto& adapter : adapters_) {
visit(
[&](auto& arg) {
arg.on_done();
},
adapter);
}
}
template <class String>
void operator()(resp3::basic_node<String> const& elem, system::error_code& ec)
void on_node(resp3::basic_node<String> const& elem, system::error_code& ec)
{
using std::visit;
@@ -148,9 +172,9 @@ public:
visit(
[&](auto& arg) {
arg(elem, ec);
arg.on_node(elem, ec);
},
adapters_[i_]);
adapters_.at(i_));
count(elem);
}
};

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -19,7 +19,10 @@ namespace boost::redis::adapter {
* RESP3 errors won't be ignored.
*/
struct ignore {
void operator()(resp3::basic_node<std::string_view> const& nd, system::error_code& ec)
void on_init() { }
void on_done() { }
void on_node(resp3::basic_node<std::string_view> const& nd, system::error_code& ec)
{
switch (nd.data_type) {
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;

View File

@@ -138,7 +138,7 @@ struct connection_impl {
, health_checker_{ex}
, logger_{std::move(lgr)}
{
mpx_.set_receive_response(ignore);
set_receive_response(ignore);
writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
// Reserve some memory to avoid excessive memory allocations in
@@ -187,13 +187,8 @@ struct connection_impl {
template <class CompletionToken>
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token)
{
auto& adapter_impl = adapter.impl_;
BOOST_ASSERT_MSG(
req.get_expected_responses() <= adapter_impl.supported_response_size,
"Request and response have incompatible sizes.");
auto notifier = std::make_shared<exec_notifier_type>(writer_timer_.get_executor(), 1);
auto info = make_elem(req, std::move(adapter_impl.adapt_fn));
auto notifier = std::make_shared<exec_notifier_type>(get_executor(), 1);
auto info = make_elem(req, std::move(adapter));
info->set_done_callback([notifier]() {
notifier->try_send(std::error_code{}, 0);
@@ -204,6 +199,14 @@ struct connection_impl {
token,
writer_timer_);
}
template <class Response>
void set_receive_response(Response& response)
{
auto adapter = detail::make_any_adapter(response);
mpx_.set_receive_response(std::move(adapter));
}
};
template <class Executor>
@@ -765,7 +768,10 @@ public:
class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
{
return this->async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
return this->async_exec(
req,
detail::make_any_adapter(resp),
std::forward<CompletionToken>(token));
}
/** @brief Executes commands on the Redis server asynchronously.
@@ -893,7 +899,7 @@ public:
template <class Response>
void set_receive_response(Response& response)
{
impl_->mpx_.set_receive_response(response);
impl_->set_receive_response(response);
}
/// Returns connection usage information.
@@ -1070,7 +1076,7 @@ public:
template <class Response = ignore_t, class CompletionToken = asio::deferred_t>
auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
{
return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
return async_exec(req, detail::make_any_adapter(resp), std::forward<CompletionToken>(token));
}
/**

View File

@@ -51,7 +51,7 @@ public:
}
BOOST_ASIO_CORO_YIELD
conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
conn_->async_exec(checker_->req_, make_any_adapter(checker_->resp_), std::move(self));
if (ec) {
conn_->logger_.trace("ping_op (3)", ec);
checker_->wait_timer_.cancel();

View File

@@ -8,6 +8,7 @@
#define BOOST_REDIS_MULTIPLEXER_HPP
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/parser.hpp>
@@ -34,13 +35,9 @@ using tribool = std::optional<bool>;
class multiplexer {
public:
using adapter_type = std::function<void(resp3::node_view const&, system::error_code&)>;
using pipeline_adapter_type = std::function<
void(std::size_t, resp3::node_view const&, system::error_code&)>;
struct elem {
public:
explicit elem(request const& req, pipeline_adapter_type adapter);
explicit elem(request const& req, any_adapter adapter);
void set_done_callback(std::function<void()> f) noexcept { done_ = std::move(f); };
@@ -88,11 +85,9 @@ public:
auto get_read_size() const -> std::size_t { return read_size_; }
auto get_remaining_responses() const -> std::size_t { return remaining_responses_; }
auto commit_response(std::size_t read_size) -> void;
auto get_adapter() -> adapter_type& { return adapter_; }
auto get_adapter() -> any_adapter_wrapper& { return adapter_; }
private:
enum class status
@@ -104,7 +99,7 @@ public:
};
request const* req_;
adapter_type adapter_;
any_adapter_wrapper adapter_;
std::function<void()> done_;
@@ -158,15 +153,7 @@ public:
return std::string_view{write_buffer_};
}
// TODO: Change signature to receive an adapter instead of a
// response.
template <class Response>
void set_receive_response(Response& response)
{
using namespace boost::redis::adapter;
auto g = boost_redis_adapt(response);
receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
}
void set_receive_response(any_adapter adapter);
[[nodiscard]]
auto get_usage() const noexcept -> usage
@@ -199,11 +186,10 @@ private:
bool on_push_ = false;
bool cancel_run_called_ = false;
usage usage_;
adapter_type receive_adapter_;
any_adapter_wrapper receive_adapter_;
};
auto make_elem(request const& req, multiplexer::pipeline_adapter_type adapter)
-> std::shared_ptr<multiplexer::elem>;
auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>;
} // namespace detail
} // namespace boost::redis

View File

@@ -42,7 +42,7 @@ struct hello_op {
BOOST_ASIO_CORO_YIELD
conn_->async_exec(
handshaker_->hello_req_,
any_adapter(handshaker_->hello_resp_),
make_any_adapter(handshaker_->hello_resp_),
std::move(self));
conn_->logger_.on_hello(ec, handshaker_->hello_resp_);

View File

@@ -11,19 +11,13 @@
namespace boost::redis::detail {
multiplexer::elem::elem(request const& req, pipeline_adapter_type adapter)
multiplexer::elem::elem(request const& req, any_adapter adapter)
: req_{&req}
, adapter_{}
, remaining_responses_{req.get_expected_responses()}
, adapter_{any_adapter_wrapper{adapter, req.get_expected_responses()}}
, status_{status::waiting}
, ec_{}
, read_size_{0}
{
adapter_ = [this, adapter](resp3::node_view const& nd, system::error_code& ec) {
auto const i = req_->get_expected_responses() - remaining_responses_;
adapter(i, nd, ec);
};
}
{ }
auto multiplexer::elem::notify_error(system::error_code ec) noexcept -> void
{
@@ -110,7 +104,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code
"Not waiting for a response (using MONITOR command perhaps?)");
BOOST_ASSERT(!reqs_.empty());
BOOST_ASSERT(reqs_.front() != nullptr);
BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0);
BOOST_ASSERT(reqs_.front()->get_adapter().get_remaining_responses() != 0);
if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec))
return std::nullopt;
@@ -122,7 +116,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code
}
reqs_.front()->commit_response(parser_.get_consumed());
if (reqs_.front()->get_remaining_responses() == 0) {
if (reqs_.front()->get_adapter().get_remaining_responses() == 0) {
// Done with this request.
reqs_.front()->notify_done();
reqs_.pop_front();
@@ -274,7 +268,7 @@ bool multiplexer::is_next_push(std::string_view data) const noexcept
// The request does not expect any response but we got one. This
// may happen if for example, subscribe with wrong syntax.
if (reqs_.front()->get_remaining_responses() == 0)
if (reqs_.front()->get_adapter().get_remaining_responses() == 0)
return true;
// Added to deal with MONITOR and also to fix PR170 which
@@ -314,8 +308,12 @@ bool multiplexer::is_waiting_response() const noexcept
bool multiplexer::is_writing() const noexcept { return !write_buffer_.empty(); }
auto make_elem(request const& req, multiplexer::pipeline_adapter_type adapter)
-> std::shared_ptr<multiplexer::elem>
void multiplexer::set_receive_response(any_adapter adapter)
{
receive_adapter_ = any_adapter_wrapper{std::move(adapter), static_cast<std::size_t>(-1)};
}
auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>
{
return std::make_shared<multiplexer::elem>(req, std::move(adapter));
}

View File

@@ -27,11 +27,10 @@ parser::parser() { reset(); }
void parser::reset()
{
depth_ = 0;
sizes_ = {{1}};
bulk_length_ = (std::numeric_limits<std::size_t>::max)();
sizes_ = default_sizes;
bulk_length_ = default_bulk_length;
bulk_ = type::invalid;
consumed_ = 0;
sizes_[0] = 2; // The sentinel must be more than 1.
}
std::size_t parser::get_consumed() const noexcept { return consumed_; }
@@ -195,4 +194,13 @@ auto parser::consume_impl(type t, std::string_view elem, system::error_code& ec)
return ret;
}
bool parser::is_parsing() const noexcept
{
auto const v = depth_ == 0 && sizes_ == default_sizes && bulk_length_ == default_bulk_length &&
bulk_ == type::invalid && consumed_ == 0;
return !v;
}
} // namespace boost::redis::resp3

View File

@@ -27,6 +27,14 @@ public:
static constexpr std::string_view sep = "\r\n";
private:
using sizes_type = std::array<std::size_t, max_embedded_depth + 1>;
// sizes_[0] = 2 because the sentinel must be more than 1.
static constexpr sizes_type default_sizes = {
{2, 1, 1, 1, 1, 1}
};
static constexpr auto default_bulk_length = static_cast<std::size_t>(-1);
// The current depth. Simple data types will have depth 0, whereas
// the elements of aggregates will have depth 1. Embedded types
// will have increasing depth.
@@ -35,7 +43,7 @@ private:
// The parser supports up to 5 levels of nested structures. The
// first element in the sizes stack is a sentinel and must be
// different from 1.
std::array<std::size_t, max_embedded_depth + 1> sizes_;
sizes_type sizes_;
// Contains the length expected in the next bulk read.
std::size_t bulk_length_;
@@ -72,14 +80,21 @@ public:
auto consume(std::string_view view, system::error_code& ec) noexcept -> result;
void reset();
bool is_parsing() const noexcept;
};
// Returns false if more data is needed. If true is returned the
// parser is either done or an error occured, that can be checked on
// ec.
template <class Adapter>
bool parse(resp3::parser& p, std::string_view const& msg, Adapter& adapter, system::error_code& ec)
bool parse(parser& p, std::string_view const& msg, Adapter& adapter, system::error_code& ec)
{
// This if could be avoid with a state machine that jumps into the
// correct position.
if (!p.is_parsing())
adapter.on_init();
while (!p.done()) {
auto const res = p.consume(msg, ec);
if (ec)
@@ -88,11 +103,12 @@ bool parse(resp3::parser& p, std::string_view const& msg, Adapter& adapter, syst
if (!res)
return false;
adapter(res.value(), ec);
adapter.on_node(res.value(), ec);
if (ec)
return true;
}
adapter.on_done();
return true;
}

View File

@@ -108,6 +108,8 @@ namespace detail {
template <class Adapter>
void deserialize(std::string_view const& data, Adapter adapter, system::error_code& ec)
{
adapter.on_init();
parser parser;
while (!parser.done()) {
auto const res = parser.consume(data, ec);
@@ -116,12 +118,14 @@ void deserialize(std::string_view const& data, Adapter adapter, system::error_co
BOOST_ASSERT(res.has_value());
adapter(res.value(), ec);
adapter.on_node(res.value(), ec);
if (ec)
return;
}
BOOST_ASSERT(parser.get_consumed() == std::size(data));
adapter.on_done();
}
template <class Adapter>

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -10,7 +10,7 @@
#include <boost/redis/adapter/result.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/system.hpp>
#include <boost/system/error_code.hpp>
#include <string>
#include <tuple>

View File

@@ -44,7 +44,6 @@ make_test(test_reader_fsm)
# Tests that require a real Redis server
make_test(test_conn_quit)
make_test(test_conn_tls)
make_test(test_conn_exec_retry)
make_test(test_conn_exec_error)
make_test(test_run)
@@ -59,6 +58,7 @@ make_test(test_conn_move)
make_test(test_issue_50)
make_test(test_issue_181)
make_test(test_conversions)
make_test(test_conn_tls)
make_test(test_unix_sockets)
# Coverage

View File

@@ -16,6 +16,7 @@ using boost::redis::generic_response;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::any_adapter;
using boost::redis::detail::make_any_adapter;
BOOST_AUTO_TEST_CASE(any_adapter_response_types)
{
@@ -24,23 +25,23 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types)
response<int, std::string> r2;
generic_response r3;
BOOST_CHECK_NO_THROW(any_adapter{r1});
BOOST_CHECK_NO_THROW(any_adapter{r2});
BOOST_CHECK_NO_THROW(any_adapter{r3});
BOOST_CHECK_NO_THROW(any_adapter{ignore});
BOOST_CHECK_NO_THROW(make_any_adapter(r1));
BOOST_CHECK_NO_THROW(make_any_adapter(r2));
BOOST_CHECK_NO_THROW(make_any_adapter(r3));
BOOST_CHECK_NO_THROW(make_any_adapter(ignore));
}
BOOST_AUTO_TEST_CASE(any_adapter_copy_move)
{
// any_adapter can be copied/moved
response<int, std::string> r;
any_adapter ad1{r};
auto ad1 = make_any_adapter(r);
// copy constructor
any_adapter ad2{ad1};
auto ad2 = any_adapter(ad1);
// move constructor
any_adapter ad3{std::move(ad2)};
auto ad3 = any_adapter(std::move(ad2));
// copy assignment
BOOST_CHECK_NO_THROW(ad2 = ad1);

View File

@@ -31,6 +31,7 @@ using boost::redis::ignore;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::detail::make_any_adapter;
using boost::system::error_code;
using namespace std::chrono_literals;
@@ -229,7 +230,7 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter)
bool finished = false;
conn->async_exec(req, boost::redis::any_adapter(res), [&](error_code ec, std::size_t) {
conn->async_exec(req, make_any_adapter(res), [&](error_code ec, std::size_t) {
BOOST_TEST(ec == error_code());
conn->cancel();
finished = true;
@@ -242,4 +243,4 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter)
BOOST_TEST(std::get<0>(res).value() == "PONG");
}
} // namespace
} // namespace

View File

@@ -180,19 +180,15 @@ struct response_error_tag { };
response_error_tag error_tag_obj;
struct response_error_adapter {
void operator()(
std::size_t,
void on_init() { }
void on_done() { }
void on_node(
boost::redis::resp3::basic_node<std::string_view> const&,
boost::system::error_code& ec)
{
ec = boost::redis::error::incompatible_size;
}
[[nodiscard]]
auto get_supported_response_size() const noexcept
{
return static_cast<std::size_t>(-1);
}
};
auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; }

View File

@@ -86,7 +86,8 @@ struct elem_and_request {
elm = std::make_shared<multiplexer::elem>(
req,
[](std::size_t, resp3::node_view const&, error_code&) { });
[](parse_event, resp3::node_view const&, error_code&) { });
elm->set_done_callback([this] {
++done_calls;
});

View File

@@ -595,8 +595,12 @@ BOOST_AUTO_TEST_CASE(adapter)
response<std::string, int, ignore_t> resp;
auto f = boost_redis_adapt(resp);
f(0, resp3::basic_node<std::string_view>{type::simple_string, 1, 0, "Hello"}, ec);
f(1, resp3::basic_node<std::string_view>{type::number, 1, 0, "42"}, ec);
f.on_init();
f.on_node(resp3::node_view{type::simple_string, 1, 0, "Hello"}, ec);
f.on_done();
f.on_init();
f.on_node(resp3::node_view{type::number, 1, 0, "42"}, ec);
f.on_done();
BOOST_CHECK_EQUAL(std::get<0>(resp).value(), "Hello");
BOOST_TEST(!ec);
@@ -614,7 +618,7 @@ BOOST_AUTO_TEST_CASE(adapter_as)
for (auto const& e : set_expected1a.value()) {
error_code ec;
adapter(e, ec);
adapter.on_node(e, ec);
}
}

View File

@@ -32,16 +32,21 @@ using boost::redis::detail::multiplexer;
using boost::redis::generic_response;
using boost::redis::resp3::node;
using boost::redis::resp3::to_string;
using boost::redis::any_adapter;
using boost::redis::detail::make_any_adapter;
using boost::system::error_code;
#define RESP3_SET_PART1 "~6\r\n+orange\r"
#define RESP3_SET_PART2 "\n+apple\r\n+one"
#define RESP3_SET_PART3 "\r\n+two\r"
#define RESP3_SET_PART4 "\n+three\r\n+orange\r\n"
char const* resp3_set = RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3 RESP3_SET_PART4;
BOOST_AUTO_TEST_CASE(low_level_sync_sans_io)
{
try {
result<std::set<std::string>> resp;
char const* wire = "~6\r\n+orange\r\n+apple\r\n+one\r\n+two\r\n+three\r\n+orange\r\n";
deserialize(wire, adapt2(resp));
deserialize(resp3_set, adapt2(resp));
for (auto const& e : resp.value())
std::cout << e << std::endl;
@@ -260,7 +265,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
boost::system::error_code ec;
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
@@ -282,7 +287,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
std::string msg;
// Only part of the message.
@@ -318,7 +323,7 @@ struct test_item {
// to Redis.
req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter(resp).impl_.adapt_fn);
elem_ptr = std::make_shared<multiplexer::elem>(req, make_any_adapter(resp));
elem_ptr->set_done_callback([this]() {
done = true;
@@ -462,3 +467,48 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u);
}
BOOST_AUTO_TEST_CASE(check_counter_adapter)
{
using boost::redis::parse_event;
using boost::redis::detail::any_adapter_wrapper;
using boost::redis::resp3::parse;
using boost::redis::resp3::parser;
using boost::redis::resp3::node_view;
using boost::system::error_code;
int init = 0;
int node = 0;
int done = 0;
auto counter_adapter = [&](parse_event ev, node_view const&, error_code&) mutable {
switch (ev) {
case parse_event::init: init++; break;
case parse_event::node: node++; break;
case parse_event::done: done++; break;
}
};
any_adapter_wrapper wrapped{counter_adapter, 1};
error_code ec;
parser p;
auto const ret1 = parse(p, RESP3_SET_PART1, wrapped, ec);
auto const ret2 = parse(p, RESP3_SET_PART1 RESP3_SET_PART2, wrapped, ec);
auto const ret3 = parse(p, RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3, wrapped, ec);
auto const ret4 = parse(
p,
RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3 RESP3_SET_PART4,
wrapped,
ec);
BOOST_TEST(!ret1);
BOOST_TEST(!ret2);
BOOST_TEST(!ret3);
BOOST_TEST(ret4);
BOOST_CHECK_EQUAL(init, 1);
BOOST_CHECK_EQUAL(node, 7);
BOOST_CHECK_EQUAL(done, 1);
}

View File

@@ -23,6 +23,7 @@ using redis::detail::reader_fsm;
using redis::detail::multiplexer;
using redis::detail::read_buffer;
using redis::generic_response;
using redis::detail::make_any_adapter;
using action = redis::detail::reader_fsm::action;
namespace boost::redis::detail {
@@ -44,7 +45,7 @@ void test_push()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -92,7 +93,7 @@ void test_read_needs_more()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -138,7 +139,7 @@ void test_read_error()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -169,7 +170,7 @@ void test_parse_error()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -200,7 +201,7 @@ void test_push_deliver_error()
read_buffer rbuf;
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;
@@ -236,7 +237,7 @@ void test_max_read_buffer_size()
rbuf.set_config({5, 7});
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.set_receive_response(make_any_adapter(resp));
reader_fsm fsm{rbuf, mpx};
error_code ec;
action act;