mirror of
https://github.com/boostorg/redis.git
synced 2026-01-20 05:02:12 +00:00
Compare commits
6 Commits
boost-1.89
...
parser_eve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a76a621b0b | ||
|
|
16bf57cf33 | ||
|
|
88d8f3c0ca | ||
|
|
20ab2c7e70 | ||
|
|
8ee2213efe | ||
|
|
97d71d1d6b |
@@ -1,4 +1,4 @@
|
||||
/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
@@ -34,24 +34,41 @@ namespace boost::redis {
|
||||
*/
|
||||
class any_adapter {
|
||||
public:
|
||||
using fn_type = std::function<void(std::size_t, resp3::node_view const&, system::error_code&)>;
|
||||
/** @brief Parse events that an adapter must support.
|
||||
*/
|
||||
enum class parse_event
|
||||
{
|
||||
/// Called before the parser starts processing data
|
||||
init,
|
||||
/// Called for each and every node of RESP3 data
|
||||
node,
|
||||
/// Called when done processing a complete RESP3 message
|
||||
done
|
||||
};
|
||||
|
||||
struct impl_t {
|
||||
fn_type adapt_fn;
|
||||
std::size_t supported_response_size;
|
||||
} impl_;
|
||||
/// The type erased implementation type.
|
||||
using impl_t = std::function<void(parse_event, resp3::node_view const&, system::error_code&)>;
|
||||
|
||||
template <class T>
|
||||
static auto create_impl(T& resp) -> impl_t
|
||||
{
|
||||
using namespace boost::redis::adapter;
|
||||
auto adapter = boost_redis_adapt(resp);
|
||||
std::size_t size = adapter.get_supported_response_size();
|
||||
return {std::move(adapter), size};
|
||||
return [adapter2 = boost_redis_adapt(resp)](
|
||||
any_adapter::parse_event ev,
|
||||
resp3::node_view const& nd,
|
||||
system::error_code& ec) mutable {
|
||||
switch (ev) {
|
||||
case parse_event::init: adapter2.on_init(); break;
|
||||
case parse_event::node: adapter2.on_node(nd, ec); break;
|
||||
case parse_event::done: adapter2.on_done(); break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
template <class Executor>
|
||||
friend class basic_connection;
|
||||
/// Contructs from a type erased adaper
|
||||
any_adapter(impl_t fn = [](parse_event, resp3::node_view const&, system::error_code&) { })
|
||||
: impl_{std::move(fn)}
|
||||
{ }
|
||||
|
||||
/**
|
||||
* @brief Constructor.
|
||||
@@ -67,6 +84,29 @@ public:
|
||||
explicit any_adapter(T& resp)
|
||||
: impl_(create_impl(resp))
|
||||
{ }
|
||||
|
||||
/// Calls the implementation with the arguments `impl_(parse_event::init, ...);`
|
||||
void on_init()
|
||||
{
|
||||
system::error_code ec;
|
||||
impl_(parse_event::init, {}, ec);
|
||||
};
|
||||
|
||||
/// Calls the implementation with the arguments `impl_(parse_event::done, ...);`
|
||||
void on_done()
|
||||
{
|
||||
system::error_code ec;
|
||||
impl_(parse_event::done, {}, ec);
|
||||
};
|
||||
|
||||
/// Calls the implementation with the arguments `impl_(parse_event::node, ...);`
|
||||
void on_node(resp3::node_view const& nd, system::error_code& ec)
|
||||
{
|
||||
impl_(parse_event::node, nd, ec);
|
||||
};
|
||||
|
||||
private:
|
||||
impl_t impl_;
|
||||
};
|
||||
|
||||
} // namespace boost::redis
|
||||
|
||||
@@ -147,8 +147,12 @@ public:
|
||||
explicit general_aggregate(Result* c = nullptr)
|
||||
: result_(c)
|
||||
{ }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
|
||||
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
|
||||
{
|
||||
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
|
||||
switch (nd.data_type) {
|
||||
@@ -180,8 +184,11 @@ public:
|
||||
: result_(t)
|
||||
{ }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
|
||||
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
|
||||
{
|
||||
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
|
||||
switch (nd.data_type) {
|
||||
@@ -206,8 +213,11 @@ class simple_impl {
|
||||
public:
|
||||
void on_value_available(Result&) { }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(Result& result, resp3::basic_node<String> const& node, system::error_code& ec)
|
||||
void on_node(Result& result, resp3::basic_node<String> const& node, system::error_code& ec)
|
||||
{
|
||||
if (is_aggregate(node.data_type)) {
|
||||
ec = redis::error::expects_resp3_simple_type;
|
||||
@@ -226,8 +236,11 @@ private:
|
||||
public:
|
||||
void on_value_available(Result& result) { hint_ = std::end(result); }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
if (is_aggregate(nd.data_type)) {
|
||||
if (nd.data_type != resp3::type::set)
|
||||
@@ -257,8 +270,11 @@ private:
|
||||
public:
|
||||
void on_value_available(Result& result) { current_ = std::end(result); }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
if (is_aggregate(nd.data_type)) {
|
||||
if (element_multiplicity(nd.data_type) != 2)
|
||||
@@ -292,8 +308,11 @@ class vector_impl {
|
||||
public:
|
||||
void on_value_available(Result&) { }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
if (is_aggregate(nd.data_type)) {
|
||||
auto const m = element_multiplicity(nd.data_type);
|
||||
@@ -313,8 +332,11 @@ private:
|
||||
public:
|
||||
void on_value_available(Result&) { }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
if (is_aggregate(nd.data_type)) {
|
||||
if (i_ != -1) {
|
||||
@@ -344,8 +366,11 @@ template <class Result>
|
||||
struct list_impl {
|
||||
void on_value_available(Result&) { }
|
||||
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
template <class String>
|
||||
void operator()(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(Result& result, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
if (!is_aggregate(nd.data_type)) {
|
||||
BOOST_ASSERT(nd.aggregate_size == 1);
|
||||
@@ -468,8 +493,11 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void on_init() { impl_.on_init(); }
|
||||
void on_done() { impl_.on_done(); }
|
||||
|
||||
template <class String>
|
||||
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
|
||||
|
||||
@@ -480,7 +508,7 @@ public:
|
||||
return;
|
||||
|
||||
BOOST_ASSERT(result_);
|
||||
impl_(result_->value(), nd, ec);
|
||||
impl_.on_node(result_->value(), nd, ec);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -514,8 +542,11 @@ public:
|
||||
: result_(o)
|
||||
{ }
|
||||
|
||||
void on_init() { impl_.on_init(); }
|
||||
void on_done() { impl_.on_done(); }
|
||||
|
||||
template <class String>
|
||||
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
|
||||
|
||||
@@ -533,7 +564,7 @@ public:
|
||||
impl_.on_value_available(result_->value().value());
|
||||
}
|
||||
|
||||
impl_(result_->value().value(), nd, ec);
|
||||
impl_.on_node(result_->value().value(), nd, ec);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
@@ -8,6 +8,7 @@
|
||||
#define BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP
|
||||
|
||||
#include <boost/redis/adapter/detail/result_traits.hpp>
|
||||
#include <boost/redis/ignore.hpp>
|
||||
#include <boost/redis/resp3/node.hpp>
|
||||
#include <boost/redis/response.hpp>
|
||||
|
||||
@@ -21,26 +22,6 @@
|
||||
|
||||
namespace boost::redis::adapter::detail {
|
||||
|
||||
class ignore_adapter {
|
||||
public:
|
||||
template <class String>
|
||||
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
switch (nd.data_type) {
|
||||
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
|
||||
case resp3::type::blob_error: ec = redis::error::resp3_blob_error; break;
|
||||
case resp3::type::null: ec = redis::error::resp3_null; break;
|
||||
default: ;
|
||||
}
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_supported_response_size() const noexcept
|
||||
{
|
||||
return static_cast<std::size_t>(-1);
|
||||
}
|
||||
};
|
||||
|
||||
template <class Response>
|
||||
class static_adapter {
|
||||
private:
|
||||
@@ -50,51 +31,44 @@ private:
|
||||
using adapters_array_type = std::array<variant_type, size>;
|
||||
|
||||
adapters_array_type adapters_;
|
||||
std::size_t i_ = 0;
|
||||
|
||||
public:
|
||||
explicit static_adapter(Response& r) { assigner<size - 1>::assign(adapters_, r); }
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_supported_response_size() const noexcept
|
||||
{
|
||||
return size;
|
||||
}
|
||||
|
||||
template <class String>
|
||||
void operator()(std::size_t i, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_init()
|
||||
{
|
||||
using std::visit;
|
||||
// I am usure whether this should be an error or an assertion.
|
||||
BOOST_ASSERT(i < adapters_.size());
|
||||
visit(
|
||||
[&](auto& arg) {
|
||||
arg(nd, ec);
|
||||
arg.on_init();
|
||||
},
|
||||
adapters_.at(i));
|
||||
adapters_.at(i_));
|
||||
}
|
||||
};
|
||||
|
||||
template <class Vector>
|
||||
class vector_adapter {
|
||||
private:
|
||||
using adapter_type = typename result_traits<Vector>::adapter_type;
|
||||
adapter_type adapter_;
|
||||
|
||||
public:
|
||||
explicit vector_adapter(Vector& v)
|
||||
: adapter_{internal_adapt(v)}
|
||||
{ }
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_supported_response_size() const noexcept
|
||||
void on_done()
|
||||
{
|
||||
return static_cast<std::size_t>(-1);
|
||||
using std::visit;
|
||||
visit(
|
||||
[&](auto& arg) {
|
||||
arg.on_done();
|
||||
},
|
||||
adapters_.at(i_));
|
||||
i_ += 1;
|
||||
}
|
||||
|
||||
template <class String>
|
||||
void operator()(std::size_t, resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
void on_node(resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
adapter_(nd, ec);
|
||||
using std::visit;
|
||||
|
||||
// I am usure whether this should be an error or an assertion.
|
||||
BOOST_ASSERT(i_ < adapters_.size());
|
||||
visit(
|
||||
[&](auto& arg) {
|
||||
arg.on_node(nd, ec);
|
||||
},
|
||||
adapters_.at(i_));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -104,25 +78,25 @@ struct response_traits;
|
||||
template <>
|
||||
struct response_traits<ignore_t> {
|
||||
using response_type = ignore_t;
|
||||
using adapter_type = detail::ignore_adapter;
|
||||
using adapter_type = ignore;
|
||||
|
||||
static auto adapt(response_type&) noexcept { return detail::ignore_adapter{}; }
|
||||
static auto adapt(response_type&) noexcept { return ignore{}; }
|
||||
};
|
||||
|
||||
template <>
|
||||
struct response_traits<result<ignore_t>> {
|
||||
using response_type = result<ignore_t>;
|
||||
using adapter_type = detail::ignore_adapter;
|
||||
using adapter_type = ignore;
|
||||
|
||||
static auto adapt(response_type&) noexcept { return detail::ignore_adapter{}; }
|
||||
static auto adapt(response_type&) noexcept { return ignore{}; }
|
||||
};
|
||||
|
||||
template <class String, class Allocator>
|
||||
struct response_traits<result<std::vector<resp3::basic_node<String>, Allocator>>> {
|
||||
using response_type = result<std::vector<resp3::basic_node<String>, Allocator>>;
|
||||
using adapter_type = vector_adapter<response_type>;
|
||||
using adapter_type = general_aggregate<response_type>;
|
||||
|
||||
static auto adapt(response_type& v) noexcept { return adapter_type{v}; }
|
||||
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
|
||||
};
|
||||
|
||||
template <class... Ts>
|
||||
@@ -133,35 +107,6 @@ struct response_traits<response<Ts...>> {
|
||||
static auto adapt(response_type& r) noexcept { return adapter_type{r}; }
|
||||
};
|
||||
|
||||
template <class Adapter>
|
||||
class wrapper {
|
||||
public:
|
||||
explicit wrapper(Adapter adapter)
|
||||
: adapter_{adapter}
|
||||
{ }
|
||||
|
||||
template <class String>
|
||||
void operator()(resp3::basic_node<String> const& nd, system::error_code& ec)
|
||||
{
|
||||
return adapter_(0, nd, ec);
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_supported_response_size() const noexcept
|
||||
{
|
||||
return adapter_.get_supported_response_size();
|
||||
}
|
||||
|
||||
private:
|
||||
Adapter adapter_;
|
||||
};
|
||||
|
||||
template <class Adapter>
|
||||
auto make_adapter_wrapper(Adapter adapter)
|
||||
{
|
||||
return wrapper{adapter};
|
||||
}
|
||||
|
||||
} // namespace boost::redis::adapter::detail
|
||||
|
||||
#endif // BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP
|
||||
|
||||
@@ -132,8 +132,32 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void on_init()
|
||||
{
|
||||
using std::visit;
|
||||
for (auto& adapter : adapters_) {
|
||||
visit(
|
||||
[&](auto& arg) {
|
||||
arg.on_init();
|
||||
},
|
||||
adapter);
|
||||
}
|
||||
}
|
||||
|
||||
void on_done()
|
||||
{
|
||||
using std::visit;
|
||||
for (auto& adapter : adapters_) {
|
||||
visit(
|
||||
[&](auto& arg) {
|
||||
arg.on_done();
|
||||
},
|
||||
adapter);
|
||||
}
|
||||
}
|
||||
|
||||
template <class String>
|
||||
void operator()(resp3::basic_node<String> const& elem, system::error_code& ec)
|
||||
void on_node(resp3::basic_node<String> const& elem, system::error_code& ec)
|
||||
{
|
||||
using std::visit;
|
||||
|
||||
@@ -148,9 +172,9 @@ public:
|
||||
|
||||
visit(
|
||||
[&](auto& arg) {
|
||||
arg(elem, ec);
|
||||
arg.on_node(elem, ec);
|
||||
},
|
||||
adapters_[i_]);
|
||||
adapters_.at(i_));
|
||||
count(elem);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
@@ -19,7 +19,10 @@ namespace boost::redis::adapter {
|
||||
* RESP3 errors won't be ignored.
|
||||
*/
|
||||
struct ignore {
|
||||
void operator()(resp3::basic_node<std::string_view> const& nd, system::error_code& ec)
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
void on_node(resp3::basic_node<std::string_view> const& nd, system::error_code& ec)
|
||||
{
|
||||
switch (nd.data_type) {
|
||||
case resp3::type::simple_error: ec = redis::error::resp3_simple_error; break;
|
||||
|
||||
@@ -88,12 +88,21 @@ struct config {
|
||||
*/
|
||||
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
|
||||
|
||||
/** @brief Maximum size of a socket read, in bytes.
|
||||
/** @brief Maximum size of the socket read-buffer in bytes.
|
||||
*
|
||||
* Sets a limit on how much data is allowed to be read into the
|
||||
* read buffer. It can be used to prevent DDOS.
|
||||
*/
|
||||
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
|
||||
|
||||
/** @brief read_buffer_append_size
|
||||
*
|
||||
* The size by which the read buffer grows when more space is
|
||||
* needed. This can help avoiding some memory allocations. Once the
|
||||
* maximum size is reached no more memory allocations are made
|
||||
* since the buffer is reused.
|
||||
*/
|
||||
std::size_t read_buffer_append_size = 4096;
|
||||
};
|
||||
|
||||
} // namespace boost::redis
|
||||
|
||||
@@ -57,105 +57,155 @@
|
||||
namespace boost::redis {
|
||||
namespace detail {
|
||||
|
||||
template <class AsyncReadStream, class DynamicBuffer>
|
||||
class append_some_op {
|
||||
private:
|
||||
AsyncReadStream& stream_;
|
||||
DynamicBuffer buf_;
|
||||
std::size_t size_ = 0;
|
||||
std::size_t tmp_ = 0;
|
||||
asio::coroutine coro_{};
|
||||
template <class Executor>
|
||||
struct connection_impl {
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using clock_traits_type = asio::wait_traits<clock_type>;
|
||||
using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, Executor>;
|
||||
|
||||
public:
|
||||
append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
|
||||
: stream_{stream}
|
||||
, buf_{std::move(buf)}
|
||||
, size_{size}
|
||||
{ }
|
||||
using receive_channel_type = asio::experimental::channel<
|
||||
Executor,
|
||||
void(system::error_code, std::size_t)>;
|
||||
using health_checker_type = detail::health_checker<Executor>;
|
||||
using resp3_handshaker_type = detail::resp3_handshaker<Executor>;
|
||||
using exec_notifier_type = asio::experimental::channel<
|
||||
Executor,
|
||||
void(system::error_code, std::size_t)>;
|
||||
|
||||
template <class Self>
|
||||
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER(coro_)
|
||||
redis_stream<Executor> stream_;
|
||||
|
||||
// Notice we use a timer to simulate a condition-variable. It is
|
||||
// also more suitable than a channel and the notify operation does
|
||||
// not suspend.
|
||||
timer_type writer_timer_;
|
||||
timer_type reconnect_timer_; // to wait the reconnection period
|
||||
receive_channel_type receive_channel_;
|
||||
health_checker_type health_checker_;
|
||||
resp3_handshaker_type handshaker_;
|
||||
|
||||
config cfg_;
|
||||
multiplexer mpx_;
|
||||
connection_logger logger_;
|
||||
read_buffer read_buffer_;
|
||||
|
||||
using executor_type = Executor;
|
||||
|
||||
executor_type get_executor() noexcept { return writer_timer_.get_executor(); }
|
||||
|
||||
struct exec_op {
|
||||
connection_impl* obj_ = nullptr;
|
||||
std::shared_ptr<exec_notifier_type> notifier_ = nullptr;
|
||||
exec_fsm fsm_;
|
||||
|
||||
template <class Self>
|
||||
void operator()(Self& self, system::error_code = {}, std::size_t = 0)
|
||||
{
|
||||
tmp_ = buf_.size();
|
||||
buf_.grow(size_);
|
||||
while (true) {
|
||||
// Invoke the state machine
|
||||
auto act = fsm_.resume(obj_->is_open(), self.get_cancellation_state().cancelled());
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
|
||||
if (ec) {
|
||||
self.complete(ec, 0);
|
||||
return;
|
||||
// Do what the FSM said
|
||||
switch (act.type()) {
|
||||
case exec_action_type::setup_cancellation:
|
||||
self.reset_cancellation_state(asio::enable_total_cancellation());
|
||||
continue; // this action does not require yielding
|
||||
case exec_action_type::immediate:
|
||||
asio::async_immediate(self.get_io_executor(), std::move(self));
|
||||
return;
|
||||
case exec_action_type::notify_writer:
|
||||
obj_->writer_timer_.cancel();
|
||||
continue; // this action does not require yielding
|
||||
case exec_action_type::wait_for_response:
|
||||
notifier_->async_receive(std::move(self));
|
||||
return;
|
||||
case exec_action_type::cancel_run:
|
||||
obj_->cancel(operation::run);
|
||||
continue; // this action does not require yielding
|
||||
case exec_action_type::done:
|
||||
notifier_.reset();
|
||||
self.complete(act.error(), act.bytes_read());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
buf_.shrink(buf_.size() - tmp_ - n);
|
||||
self.complete({}, n);
|
||||
connection_impl(Executor&& ex, asio::ssl::context&& ctx, logger&& lgr)
|
||||
: stream_{ex, std::move(ctx)}
|
||||
, writer_timer_{ex}
|
||||
, reconnect_timer_{ex}
|
||||
, receive_channel_{ex, 256}
|
||||
, health_checker_{ex}
|
||||
, logger_{std::move(lgr)}
|
||||
{
|
||||
set_receive_adapter(any_adapter{ignore});
|
||||
writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
|
||||
|
||||
// Reserve some memory to avoid excessive memory allocations in
|
||||
// the first reads.
|
||||
read_buffer_.reserve(4096u);
|
||||
}
|
||||
|
||||
void cancel(operation op)
|
||||
{
|
||||
switch (op) {
|
||||
case operation::resolve: stream_.cancel_resolve(); break;
|
||||
case operation::exec: mpx_.cancel_waiting(); break;
|
||||
case operation::reconnection:
|
||||
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
|
||||
break;
|
||||
case operation::run: cancel_run(); break;
|
||||
case operation::receive: receive_channel_.cancel(); break;
|
||||
case operation::health_check: health_checker_.cancel(); break;
|
||||
case operation::all:
|
||||
stream_.cancel_resolve();
|
||||
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
|
||||
health_checker_.cancel();
|
||||
cancel_run(); // run
|
||||
receive_channel_.cancel(); // receive
|
||||
mpx_.cancel_waiting(); // exec
|
||||
break;
|
||||
default: /* ignore */;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
|
||||
auto async_append_some(
|
||||
AsyncReadStream& stream,
|
||||
DynamicBuffer buffer,
|
||||
std::size_t size,
|
||||
CompletionToken&& token)
|
||||
{
|
||||
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
|
||||
append_some_op<AsyncReadStream, DynamicBuffer>{stream, buffer, size},
|
||||
token,
|
||||
stream);
|
||||
}
|
||||
void cancel_run()
|
||||
{
|
||||
stream_.close();
|
||||
writer_timer_.cancel();
|
||||
receive_channel_.cancel();
|
||||
mpx_.cancel_on_conn_lost();
|
||||
}
|
||||
|
||||
bool is_open() const noexcept { return stream_.is_open(); }
|
||||
|
||||
bool will_reconnect() const noexcept
|
||||
{
|
||||
return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();
|
||||
}
|
||||
|
||||
template <class CompletionToken>
|
||||
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token)
|
||||
{
|
||||
auto notifier = std::make_shared<exec_notifier_type>(get_executor(), 1);
|
||||
auto info = make_elem(req, std::move(adapter));
|
||||
|
||||
info->set_done_callback([notifier]() {
|
||||
notifier->try_send(std::error_code{}, 0);
|
||||
});
|
||||
|
||||
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
|
||||
exec_op{this, notifier, exec_fsm(mpx_, std::move(info))},
|
||||
token,
|
||||
writer_timer_);
|
||||
}
|
||||
|
||||
void set_receive_adapter(any_adapter adapter) { mpx_.set_receive_adapter(std::move(adapter)); }
|
||||
};
|
||||
|
||||
template <class Executor>
|
||||
using exec_notifier_type = asio::experimental::channel<
|
||||
Executor,
|
||||
void(system::error_code, std::size_t)>;
|
||||
|
||||
template <class Conn>
|
||||
struct exec_op {
|
||||
using executor_type = typename Conn::executor_type;
|
||||
|
||||
Conn* conn_ = nullptr;
|
||||
std::shared_ptr<exec_notifier_type<executor_type>> notifier_ = nullptr;
|
||||
detail::exec_fsm fsm_;
|
||||
|
||||
template <class Self>
|
||||
void operator()(Self& self, system::error_code = {}, std::size_t = 0)
|
||||
{
|
||||
while (true) {
|
||||
// Invoke the state machine
|
||||
auto act = fsm_.resume(conn_->is_open(), self.get_cancellation_state().cancelled());
|
||||
|
||||
// Do what the FSM said
|
||||
switch (act.type()) {
|
||||
case detail::exec_action_type::setup_cancellation:
|
||||
self.reset_cancellation_state(asio::enable_total_cancellation());
|
||||
continue; // this action does not require yielding
|
||||
case detail::exec_action_type::immediate:
|
||||
asio::async_immediate(self.get_io_executor(), std::move(self));
|
||||
return;
|
||||
case detail::exec_action_type::notify_writer:
|
||||
conn_->writer_timer_.cancel();
|
||||
continue; // this action does not require yielding
|
||||
case detail::exec_action_type::wait_for_response:
|
||||
notifier_->async_receive(std::move(self));
|
||||
return;
|
||||
case detail::exec_action_type::cancel_run:
|
||||
conn_->cancel(operation::run);
|
||||
continue; // this action does not require yielding
|
||||
case detail::exec_action_type::done:
|
||||
notifier_.reset();
|
||||
self.complete(act.error(), act.bytes_read());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class Conn>
|
||||
struct writer_op {
|
||||
Conn* conn_;
|
||||
connection_impl<Executor>* conn_;
|
||||
asio::coroutine coro{};
|
||||
|
||||
template <class Self>
|
||||
@@ -207,33 +257,20 @@ struct writer_op {
|
||||
}
|
||||
};
|
||||
|
||||
template <class Conn>
|
||||
template <class Executor>
|
||||
struct reader_op {
|
||||
using dyn_buffer_type = asio::dynamic_string_buffer<
|
||||
char,
|
||||
std::char_traits<char>,
|
||||
std::allocator<char>>;
|
||||
|
||||
// TODO: Move this to config so the user can fine tune?
|
||||
static constexpr std::size_t buffer_growth_hint = 4096;
|
||||
|
||||
Conn* conn_;
|
||||
detail::reader_fsm fsm_;
|
||||
connection_impl<Executor>* conn_;
|
||||
reader_fsm fsm_;
|
||||
|
||||
public:
|
||||
reader_op(Conn& conn) noexcept
|
||||
reader_op(connection_impl<Executor>& conn) noexcept
|
||||
: conn_{&conn}
|
||||
, fsm_{conn.mpx_}
|
||||
, fsm_{conn.read_buffer_, conn.mpx_}
|
||||
{ }
|
||||
|
||||
template <class Self>
|
||||
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
|
||||
{
|
||||
using dyn_buffer_type = asio::dynamic_string_buffer<
|
||||
char,
|
||||
std::char_traits<char>,
|
||||
std::allocator<char>>;
|
||||
|
||||
for (;;) {
|
||||
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());
|
||||
|
||||
@@ -245,11 +282,10 @@ public:
|
||||
continue;
|
||||
case reader_fsm::action::type::needs_more:
|
||||
case reader_fsm::action::type::append_some:
|
||||
async_append_some(
|
||||
conn_->stream_,
|
||||
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
|
||||
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
|
||||
std::move(self));
|
||||
{
|
||||
auto const buf = conn_->read_buffer_.get_append_buffer();
|
||||
conn_->stream_.async_read_some(asio::buffer(buf), std::move(self));
|
||||
}
|
||||
return;
|
||||
case reader_fsm::action::type::notify_push_receiver:
|
||||
if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
|
||||
@@ -278,17 +314,35 @@ inline system::error_code check_config(const config& cfg)
|
||||
return system::error_code{};
|
||||
}
|
||||
|
||||
template <class Conn>
|
||||
template <class Executor>
|
||||
class run_op {
|
||||
private:
|
||||
Conn* conn_ = nullptr;
|
||||
connection_impl<Executor>* conn_ = nullptr;
|
||||
asio::coroutine coro_{};
|
||||
system::error_code stored_ec_;
|
||||
|
||||
using order_t = std::array<std::size_t, 5>;
|
||||
|
||||
template <class CompletionToken>
|
||||
auto reader(CompletionToken&& token)
|
||||
{
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
reader_op<Executor>{*conn_},
|
||||
std::forward<CompletionToken>(token),
|
||||
conn_->writer_timer_);
|
||||
}
|
||||
|
||||
template <class CompletionToken>
|
||||
auto writer(CompletionToken&& token)
|
||||
{
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
writer_op<Executor>{conn_},
|
||||
std::forward<CompletionToken>(token),
|
||||
conn_->writer_timer_);
|
||||
}
|
||||
|
||||
public:
|
||||
run_op(Conn* conn) noexcept
|
||||
run_op(connection_impl<Executor>* conn) noexcept
|
||||
: conn_{conn}
|
||||
{ }
|
||||
|
||||
@@ -342,6 +396,7 @@ public:
|
||||
|
||||
// If we were successful, run all the connection tasks
|
||||
if (!ec) {
|
||||
conn_->read_buffer_.clear();
|
||||
conn_->mpx_.reset();
|
||||
|
||||
// Note: Order is important here because the writer might
|
||||
@@ -359,10 +414,10 @@ public:
|
||||
return conn_->health_checker_.async_check_timeout(*conn_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->reader(token);
|
||||
return this->reader(token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->writer(token);
|
||||
return this->writer(token);
|
||||
})
|
||||
.async_wait(asio::experimental::wait_for_one_error(), std::move(self));
|
||||
|
||||
@@ -445,16 +500,11 @@ public:
|
||||
executor_type ex,
|
||||
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
|
||||
logger lgr = {})
|
||||
: stream_{ex, std::move(ctx)}
|
||||
, writer_timer_{ex}
|
||||
, reconnect_timer_{ex}
|
||||
, receive_channel_{ex, 256}
|
||||
, health_checker_{ex}
|
||||
, logger_{std::move(lgr)}
|
||||
{
|
||||
set_receive_response(ignore);
|
||||
writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
|
||||
}
|
||||
: impl_(std::make_unique<detail::connection_impl<Executor>>(
|
||||
std::move(ex),
|
||||
std::move(ctx),
|
||||
std::move(lgr)))
|
||||
{ }
|
||||
|
||||
/** @brief Constructor from an executor and a logger.
|
||||
*
|
||||
@@ -504,7 +554,7 @@ public:
|
||||
{ }
|
||||
|
||||
/// Returns the associated executor.
|
||||
executor_type get_executor() noexcept { return writer_timer_.get_executor(); }
|
||||
executor_type get_executor() noexcept { return impl_->writer_timer_.get_executor(); }
|
||||
|
||||
/** @brief Starts the underlying connection operations.
|
||||
*
|
||||
@@ -550,14 +600,15 @@ public:
|
||||
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
|
||||
auto async_run(config const& cfg, CompletionToken&& token = {})
|
||||
{
|
||||
cfg_ = cfg;
|
||||
health_checker_.set_config(cfg);
|
||||
handshaker_.set_config(cfg);
|
||||
impl_->cfg_ = cfg;
|
||||
impl_->health_checker_.set_config(cfg);
|
||||
impl_->handshaker_.set_config(cfg);
|
||||
impl_->read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size});
|
||||
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
detail::run_op<this_type>{this},
|
||||
detail::run_op<Executor>{impl_.get()},
|
||||
token,
|
||||
writer_timer_);
|
||||
impl_->writer_timer_);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -636,10 +687,10 @@ public:
|
||||
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
|
||||
auto async_receive(CompletionToken&& token = {})
|
||||
{
|
||||
return receive_channel_.async_receive(std::forward<CompletionToken>(token));
|
||||
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/** @brief Receives server pushes synchronously without blocking.
|
||||
/** @brief Receives server> pushes synchronously without blocking.
|
||||
*
|
||||
* Receives a server push synchronously by calling `try_receive` on
|
||||
* the underlying channel. If the operation fails because
|
||||
@@ -658,7 +709,7 @@ public:
|
||||
size = n;
|
||||
};
|
||||
|
||||
auto const res = receive_channel_.try_receive(f);
|
||||
auto const res = impl_->receive_channel_.try_receive(f);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
@@ -710,7 +761,10 @@ public:
|
||||
class CompletionToken = asio::default_completion_token_t<executor_type>>
|
||||
auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
|
||||
{
|
||||
return this->async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
|
||||
return this->async_exec(
|
||||
req,
|
||||
any_adapter{resp},
|
||||
std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/** @brief Executes commands on the Redis server asynchronously.
|
||||
@@ -754,24 +808,7 @@ public:
|
||||
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
|
||||
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {})
|
||||
{
|
||||
auto& adapter_impl = adapter.impl_;
|
||||
BOOST_ASSERT_MSG(
|
||||
req.get_expected_responses() <= adapter_impl.supported_response_size,
|
||||
"Request and response have incompatible sizes.");
|
||||
|
||||
auto notifier = std::make_shared<detail::exec_notifier_type<executor_type>>(
|
||||
get_executor(),
|
||||
1);
|
||||
auto info = detail::make_elem(req, std::move(adapter_impl.adapt_fn));
|
||||
|
||||
info->set_done_callback([notifier]() {
|
||||
notifier->try_send(std::error_code{}, 0);
|
||||
});
|
||||
|
||||
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
|
||||
detail::exec_op<this_type>{this, notifier, detail::exec_fsm(mpx_, std::move(info))},
|
||||
token,
|
||||
writer_timer_);
|
||||
return impl_->async_exec(req, std::move(adapter), std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/** @brief Cancel operations.
|
||||
@@ -785,36 +822,12 @@ public:
|
||||
*
|
||||
* @param op The operation to be cancelled.
|
||||
*/
|
||||
void cancel(operation op = operation::all)
|
||||
{
|
||||
switch (op) {
|
||||
case operation::resolve: stream_.cancel_resolve(); break;
|
||||
case operation::exec: mpx_.cancel_waiting(); break;
|
||||
case operation::reconnection:
|
||||
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
|
||||
break;
|
||||
case operation::run: cancel_run(); break;
|
||||
case operation::receive: receive_channel_.cancel(); break;
|
||||
case operation::health_check: health_checker_.cancel(); break;
|
||||
case operation::all:
|
||||
stream_.cancel_resolve();
|
||||
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
|
||||
health_checker_.cancel();
|
||||
cancel_run(); // run
|
||||
receive_channel_.cancel(); // receive
|
||||
mpx_.cancel_waiting(); // exec
|
||||
break;
|
||||
default: /* ignore */;
|
||||
}
|
||||
}
|
||||
void cancel(operation op = operation::all) { impl_->cancel(op); }
|
||||
|
||||
auto run_is_canceled() const noexcept { return mpx_.get_cancel_run_state(); }
|
||||
auto run_is_canceled() const noexcept { return impl_->mpx_.get_cancel_run_state(); }
|
||||
|
||||
/// Returns true if the connection will try to reconnect if an error is encountered.
|
||||
bool will_reconnect() const noexcept
|
||||
{
|
||||
return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();
|
||||
}
|
||||
bool will_reconnect() const noexcept { return impl_->will_reconnect(); }
|
||||
|
||||
/**
|
||||
* @brief (Deprecated) Returns the ssl context.
|
||||
@@ -828,7 +841,10 @@ public:
|
||||
BOOST_DEPRECATED(
|
||||
"ssl::context has no const methods, so this function should not be called. Set up any "
|
||||
"required TLS configuration before passing the ssl::context to the connection's constructor.")
|
||||
asio::ssl::context const& get_ssl_context() const noexcept { return stream_.get_ssl_context(); }
|
||||
asio::ssl::context const& get_ssl_context() const noexcept
|
||||
{
|
||||
return impl_->stream_.get_ssl_context();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief (Deprecated) Resets the underlying stream.
|
||||
@@ -854,7 +870,7 @@ public:
|
||||
BOOST_DEPRECATED(
|
||||
"Accessing the underlying stream is deprecated and will be removed in the next release. Use "
|
||||
"the other member functions to interact with the connection.")
|
||||
auto& next_layer() noexcept { return stream_.next_layer(); }
|
||||
auto& next_layer() noexcept { return impl_->stream_.next_layer(); }
|
||||
|
||||
/**
|
||||
* @brief (Deprecated) Returns a reference to the next layer.
|
||||
@@ -870,17 +886,17 @@ public:
|
||||
BOOST_DEPRECATED(
|
||||
"Accessing the underlying stream is deprecated and will be removed in the next release. Use "
|
||||
"the other member functions to interact with the connection.")
|
||||
auto const& next_layer() const noexcept { return stream_.next_layer(); }
|
||||
auto const& next_layer() const noexcept { return impl_->stream_.next_layer(); }
|
||||
|
||||
/// Sets the response object of @ref async_receive operations.
|
||||
template <class Response>
|
||||
void set_receive_response(Response& response)
|
||||
void set_receive_response(Response& resp)
|
||||
{
|
||||
mpx_.set_receive_response(response);
|
||||
impl_->set_receive_adapter(any_adapter{resp});
|
||||
}
|
||||
|
||||
/// Returns connection usage information.
|
||||
usage get_usage() const noexcept { return mpx_.get_usage(); }
|
||||
usage get_usage() const noexcept { return impl_->mpx_.get_usage(); }
|
||||
|
||||
private:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
@@ -893,65 +909,17 @@ private:
|
||||
using health_checker_type = detail::health_checker<Executor>;
|
||||
using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
|
||||
|
||||
auto use_ssl() const noexcept { return cfg_.use_ssl; }
|
||||
|
||||
void cancel_run()
|
||||
{
|
||||
stream_.close();
|
||||
writer_timer_.cancel();
|
||||
receive_channel_.cancel();
|
||||
mpx_.cancel_on_conn_lost();
|
||||
}
|
||||
auto use_ssl() const noexcept { return impl_->cfg_.use_ssl; }
|
||||
|
||||
// Used by both this class and connection
|
||||
void set_stderr_logger(logger::level lvl, const config& cfg)
|
||||
{
|
||||
logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix));
|
||||
impl_->logger_.reset(detail::make_stderr_logger(lvl, cfg.log_prefix));
|
||||
}
|
||||
|
||||
template <class> friend struct detail::reader_op;
|
||||
template <class> friend struct detail::writer_op;
|
||||
template <class> friend struct detail::exec_op;
|
||||
template <class, class> friend struct detail::hello_op;
|
||||
template <class, class> friend class detail::ping_op;
|
||||
template <class> friend class detail::run_op;
|
||||
template <class, class> friend class detail::check_timeout_op;
|
||||
friend class connection;
|
||||
|
||||
template <class CompletionToken>
|
||||
auto reader(CompletionToken&& token)
|
||||
{
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
detail::reader_op<this_type>{*this},
|
||||
std::forward<CompletionToken>(token),
|
||||
writer_timer_);
|
||||
}
|
||||
|
||||
template <class CompletionToken>
|
||||
auto writer(CompletionToken&& token)
|
||||
{
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
detail::writer_op<this_type>{this},
|
||||
std::forward<CompletionToken>(token),
|
||||
writer_timer_);
|
||||
}
|
||||
|
||||
bool is_open() const noexcept { return stream_.is_open(); }
|
||||
|
||||
detail::redis_stream<Executor> stream_;
|
||||
|
||||
// Notice we use a timer to simulate a condition-variable. It is
|
||||
// also more suitable than a channel and the notify operation does
|
||||
// not suspend.
|
||||
timer_type writer_timer_;
|
||||
timer_type reconnect_timer_; // to wait the reconnection period
|
||||
receive_channel_type receive_channel_;
|
||||
health_checker_type health_checker_;
|
||||
resp3_handshaker_type handshaker_;
|
||||
|
||||
config cfg_;
|
||||
detail::multiplexer mpx_;
|
||||
detail::connection_logger logger_;
|
||||
std::unique_ptr<detail::connection_impl<Executor>> impl_;
|
||||
};
|
||||
|
||||
/** @brief A basic_connection that type erases the executor.
|
||||
@@ -1101,7 +1069,10 @@ public:
|
||||
template <class Response = ignore_t, class CompletionToken = asio::deferred_t>
|
||||
auto async_exec(request const& req, Response& resp = ignore, CompletionToken&& token = {})
|
||||
{
|
||||
return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
|
||||
return async_exec(
|
||||
req,
|
||||
any_adapter{resp},
|
||||
std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1136,7 +1107,7 @@ public:
|
||||
"the other member functions to interact with the connection.")
|
||||
asio::ssl::stream<asio::ip::tcp::socket>& next_layer() noexcept
|
||||
{
|
||||
return impl_.stream_.next_layer();
|
||||
return impl_.impl_->stream_.next_layer();
|
||||
}
|
||||
|
||||
/// (Deprecated) Calls @ref boost::redis::basic_connection::next_layer.
|
||||
@@ -1145,7 +1116,7 @@ public:
|
||||
"the other member functions to interact with the connection.")
|
||||
asio::ssl::stream<asio::ip::tcp::socket> const& next_layer() const noexcept
|
||||
{
|
||||
return impl_.stream_.next_layer();
|
||||
return impl_.impl_->stream_.next_layer();
|
||||
}
|
||||
|
||||
/// @copydoc basic_connection::reset_stream
|
||||
@@ -1170,7 +1141,7 @@ public:
|
||||
"required TLS configuration before passing the ssl::context to the connection's constructor.")
|
||||
asio::ssl::context const& get_ssl_context() const noexcept
|
||||
{
|
||||
return impl_.stream_.get_ssl_context();
|
||||
return impl_.impl_->stream_.get_ssl_context();
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@@ -24,11 +24,11 @@
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
template <class HealthChecker, class Connection>
|
||||
template <class HealthChecker, class ConnectionImpl>
|
||||
class ping_op {
|
||||
public:
|
||||
HealthChecker* checker_ = nullptr;
|
||||
Connection* conn_ = nullptr;
|
||||
ConnectionImpl* conn_ = nullptr;
|
||||
asio::coroutine coro_{};
|
||||
|
||||
template <class Self>
|
||||
@@ -51,7 +51,10 @@ public:
|
||||
}
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
|
||||
conn_->async_exec(
|
||||
checker_->req_,
|
||||
any_adapter{checker_->resp_},
|
||||
std::move(self));
|
||||
if (ec) {
|
||||
conn_->logger_.trace("ping_op (3)", ec);
|
||||
checker_->wait_timer_.cancel();
|
||||
@@ -155,11 +158,11 @@ public:
|
||||
wait_timer_.cancel();
|
||||
}
|
||||
|
||||
template <class Connection, class CompletionToken>
|
||||
auto async_ping(Connection& conn, CompletionToken token)
|
||||
template <class ConnectionImpl, class CompletionToken>
|
||||
auto async_ping(ConnectionImpl& conn, CompletionToken token)
|
||||
{
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
ping_op<health_checker, Connection>{this, &conn},
|
||||
ping_op<health_checker, ConnectionImpl>{this, &conn},
|
||||
token,
|
||||
conn,
|
||||
ping_timer_);
|
||||
|
||||
@@ -9,12 +9,13 @@
|
||||
|
||||
#include <boost/redis/adapter/adapt.hpp>
|
||||
#include <boost/redis/adapter/any_adapter.hpp>
|
||||
#include <boost/redis/config.hpp>
|
||||
#include <boost/redis/operation.hpp>
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
#include <boost/redis/resp3/node.hpp>
|
||||
#include <boost/redis/resp3/parser.hpp>
|
||||
#include <boost/redis/resp3/type.hpp>
|
||||
#include <boost/redis/usage.hpp>
|
||||
|
||||
#include <boost/asio/experimental/channel.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <deque>
|
||||
@@ -32,14 +33,11 @@ namespace detail {
|
||||
|
||||
using tribool = std::optional<bool>;
|
||||
|
||||
struct multiplexer {
|
||||
using adapter_type = std::function<void(resp3::node_view const&, system::error_code&)>;
|
||||
using pipeline_adapter_type = std::function<
|
||||
void(std::size_t, resp3::node_view const&, system::error_code&)>;
|
||||
|
||||
class multiplexer {
|
||||
public:
|
||||
struct elem {
|
||||
public:
|
||||
explicit elem(request const& req, pipeline_adapter_type adapter);
|
||||
explicit elem(request const& req, any_adapter adapter);
|
||||
|
||||
void set_done_callback(std::function<void()> f) noexcept { done_ = std::move(f); };
|
||||
|
||||
@@ -91,7 +89,7 @@ struct multiplexer {
|
||||
|
||||
auto commit_response(std::size_t read_size) -> void;
|
||||
|
||||
auto get_adapter() -> adapter_type& { return adapter_; }
|
||||
auto get_adapter() -> any_adapter& { return adapter_; }
|
||||
|
||||
private:
|
||||
enum class status
|
||||
@@ -103,8 +101,7 @@ struct multiplexer {
|
||||
};
|
||||
|
||||
request const* req_;
|
||||
adapter_type adapter_;
|
||||
|
||||
any_adapter adapter_;
|
||||
std::function<void()> done_;
|
||||
|
||||
// Contains the number of commands that haven't been read yet.
|
||||
@@ -127,7 +124,8 @@ struct multiplexer {
|
||||
// If the tribool contains no value more data is needed, otherwise
|
||||
// if the value is true the message consumed is a push.
|
||||
[[nodiscard]]
|
||||
auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
|
||||
auto consume_next(std::string_view data, system::error_code& ec)
|
||||
-> std::pair<tribool, std::size_t>;
|
||||
|
||||
auto add(std::shared_ptr<elem> const& ptr) -> void;
|
||||
auto reset() -> void;
|
||||
@@ -156,27 +154,7 @@ struct multiplexer {
|
||||
return std::string_view{write_buffer_};
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_read_buffer() noexcept -> std::string&
|
||||
{
|
||||
return read_buffer_;
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_read_buffer() const noexcept -> std::string const&
|
||||
{
|
||||
return read_buffer_;
|
||||
}
|
||||
|
||||
// TODO: Change signature to receive an adapter instead of a
|
||||
// response.
|
||||
template <class Response>
|
||||
void set_receive_response(Response& response)
|
||||
{
|
||||
using namespace boost::redis::adapter;
|
||||
auto g = boost_redis_adapt(response);
|
||||
receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
|
||||
}
|
||||
void set_receive_adapter(any_adapter adapter);
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_usage() const noexcept -> usage
|
||||
@@ -191,28 +169,28 @@ private:
|
||||
[[nodiscard]]
|
||||
auto is_waiting_response() const noexcept -> bool;
|
||||
|
||||
[[nodiscard]]
|
||||
auto on_finish_parsing(bool is_push) -> std::size_t;
|
||||
void commit_usage(bool is_push, std::size_t size);
|
||||
|
||||
[[nodiscard]]
|
||||
auto is_next_push() const noexcept -> bool;
|
||||
auto is_next_push(std::string_view data) const noexcept -> bool;
|
||||
|
||||
// Releases the number of requests that have been released.
|
||||
[[nodiscard]]
|
||||
auto release_push_requests() -> std::size_t;
|
||||
|
||||
std::string read_buffer_;
|
||||
[[nodiscard]]
|
||||
tribool consume_next_impl(std::string_view data, system::error_code& ec);
|
||||
|
||||
std::string write_buffer_;
|
||||
std::deque<std::shared_ptr<elem>> reqs_;
|
||||
resp3::parser parser_{};
|
||||
bool on_push_ = false;
|
||||
bool cancel_run_called_ = false;
|
||||
usage usage_;
|
||||
adapter_type receive_adapter_;
|
||||
any_adapter receive_adapter_;
|
||||
};
|
||||
|
||||
auto make_elem(request const& req, multiplexer::pipeline_adapter_type adapter)
|
||||
-> std::shared_ptr<multiplexer::elem>;
|
||||
auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>;
|
||||
|
||||
} // namespace detail
|
||||
} // namespace boost::redis
|
||||
|
||||
65
include/boost/redis/detail/read_buffer.hpp
Normal file
65
include/boost/redis/detail/read_buffer.hpp
Normal file
@@ -0,0 +1,65 @@
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#ifndef BOOST_REDIS_READ_BUFFER_HPP
|
||||
#define BOOST_REDIS_READ_BUFFER_HPP
|
||||
|
||||
#include <boost/core/span.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include <cstddef>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
class read_buffer {
|
||||
public:
|
||||
using span_type = span<char>;
|
||||
|
||||
// See config.hpp for the meaning of these parameters.
|
||||
struct config {
|
||||
std::size_t read_buffer_append_size = 4096u;
|
||||
std::size_t max_read_size = static_cast<std::size_t>(-1);
|
||||
};
|
||||
|
||||
[[nodiscard]]
|
||||
auto prepare_append() -> system::error_code;
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_append_buffer() noexcept -> span_type;
|
||||
|
||||
void commit_append(std::size_t read_size);
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_committed_buffer() const noexcept -> std::string_view;
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_committed_size() const noexcept -> std::size_t;
|
||||
|
||||
void clear();
|
||||
|
||||
// Consume committed data.
|
||||
auto consume_committed(std::size_t size) -> std::size_t;
|
||||
|
||||
void reserve(std::size_t n);
|
||||
|
||||
friend bool operator==(read_buffer const& lhs, read_buffer const& rhs);
|
||||
|
||||
friend bool operator!=(read_buffer const& lhs, read_buffer const& rhs);
|
||||
|
||||
void set_config(config const& cfg) noexcept { cfg_ = cfg; };
|
||||
|
||||
private:
|
||||
config cfg_ = config{};
|
||||
std::vector<char> buffer_;
|
||||
std::size_t append_buf_begin_ = 0;
|
||||
};
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
|
||||
#endif // BOOST_REDIS_READ_BUFFER_HPP
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
#ifndef BOOST_REDIS_READER_FSM_HPP
|
||||
#define BOOST_REDIS_READER_FSM_HPP
|
||||
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
|
||||
#include <boost/asio/cancellation_type.hpp>
|
||||
@@ -16,6 +15,8 @@
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
class read_buffer;
|
||||
|
||||
class reader_fsm {
|
||||
public:
|
||||
struct action {
|
||||
@@ -30,11 +31,11 @@ public:
|
||||
};
|
||||
|
||||
type type_ = type::setup_cancellation;
|
||||
std::size_t push_size_ = 0;
|
||||
std::size_t push_size_ = 0u;
|
||||
system::error_code ec_ = {};
|
||||
};
|
||||
|
||||
explicit reader_fsm(multiplexer& mpx) noexcept;
|
||||
explicit reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept;
|
||||
|
||||
action resume(
|
||||
std::size_t bytes_read,
|
||||
@@ -43,6 +44,7 @@ public:
|
||||
|
||||
private:
|
||||
int resume_point_{0};
|
||||
read_buffer* read_buffer_ = nullptr;
|
||||
action action_after_resume_;
|
||||
action::type next_read_type_ = action::type::append_some;
|
||||
multiplexer* mpx_ = nullptr;
|
||||
|
||||
@@ -26,10 +26,10 @@ void push_hello(config const& cfg, request& req);
|
||||
// TODO: Can we avoid this whole function whose only purpose is to
|
||||
// check for an error in the hello response and complete with an error
|
||||
// so that the parallel group that starts it can exit?
|
||||
template <class Handshaker, class Connection>
|
||||
template <class Handshaker, class ConnectionImpl>
|
||||
struct hello_op {
|
||||
Handshaker* handshaker_ = nullptr;
|
||||
Connection* conn_ = nullptr;
|
||||
ConnectionImpl* conn_ = nullptr;
|
||||
asio::coroutine coro_{};
|
||||
|
||||
template <class Self>
|
||||
@@ -42,8 +42,9 @@ struct hello_op {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->async_exec(
|
||||
handshaker_->hello_req_,
|
||||
any_adapter(handshaker_->hello_resp_),
|
||||
any_adapter{handshaker_->hello_resp_},
|
||||
std::move(self));
|
||||
|
||||
conn_->logger_.on_hello(ec, handshaker_->hello_resp_);
|
||||
|
||||
if (ec) {
|
||||
@@ -68,11 +69,11 @@ class resp3_handshaker {
|
||||
public:
|
||||
void set_config(config const& cfg) { cfg_ = cfg; }
|
||||
|
||||
template <class Connection, class CompletionToken>
|
||||
auto async_hello(Connection& conn, CompletionToken token)
|
||||
template <class ConnectionImpl, class CompletionToken>
|
||||
auto async_hello(ConnectionImpl& conn, CompletionToken token)
|
||||
{
|
||||
return asio::async_compose<CompletionToken, void(system::error_code)>(
|
||||
hello_op<resp3_handshaker, Connection>{this, &conn},
|
||||
hello_op<resp3_handshaker, ConnectionImpl>{this, &conn},
|
||||
token,
|
||||
conn);
|
||||
}
|
||||
|
||||
@@ -88,6 +88,9 @@ enum class error
|
||||
|
||||
/// The configuration specified UNIX sockets with SSL, which is not supported.
|
||||
unix_sockets_ssl_unsupported,
|
||||
|
||||
/// Reading data from the socket would exceed the maximum size allowed of the read buffer.
|
||||
exceeds_maximum_read_buffer_size,
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -50,6 +50,9 @@ struct error_category_impl : system::error_category {
|
||||
"supported by the system.";
|
||||
case error::unix_sockets_ssl_unsupported:
|
||||
return "The configuration specified UNIX sockets with SSL, which is not supported.";
|
||||
case error::exceeds_maximum_read_buffer_size:
|
||||
return "Reading data from the socket would exceed the maximum size allowed of the read "
|
||||
"buffer.";
|
||||
default: BOOST_ASSERT(false); return "Boost.Redis error.";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,19 +11,14 @@
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
multiplexer::elem::elem(request const& req, pipeline_adapter_type adapter)
|
||||
multiplexer::elem::elem(request const& req, any_adapter adapter)
|
||||
: req_{&req}
|
||||
, adapter_{}
|
||||
, adapter_{std::move(adapter)}
|
||||
, remaining_responses_{req.get_expected_responses()}
|
||||
, status_{status::waiting}
|
||||
, ec_{}
|
||||
, read_size_{0}
|
||||
{
|
||||
adapter_ = [this, adapter](resp3::node_view const& nd, system::error_code& ec) {
|
||||
auto const i = req_->get_expected_responses() - remaining_responses_;
|
||||
adapter(i, nd, ec);
|
||||
};
|
||||
}
|
||||
{ }
|
||||
|
||||
auto multiplexer::elem::notify_error(system::error_code ec) noexcept -> void
|
||||
{
|
||||
@@ -81,7 +76,7 @@ void multiplexer::add(std::shared_ptr<elem> const& info)
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec)
|
||||
tribool multiplexer::consume_next_impl(std::string_view data, system::error_code& ec)
|
||||
{
|
||||
// We arrive here in two states:
|
||||
//
|
||||
@@ -93,18 +88,16 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec
|
||||
// 2. On a new message, in which case we have to determine
|
||||
// whether the next messag is a push or a response.
|
||||
//
|
||||
|
||||
BOOST_ASSERT(!data.empty());
|
||||
if (!on_push_) // Prepare for new message.
|
||||
on_push_ = is_next_push();
|
||||
on_push_ = is_next_push(data);
|
||||
|
||||
if (on_push_) {
|
||||
if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec))
|
||||
return std::make_pair(std::nullopt, 0);
|
||||
if (!resp3::parse(parser_, data, receive_adapter_, ec))
|
||||
return std::nullopt;
|
||||
|
||||
if (ec)
|
||||
return std::make_pair(std::make_optional(true), 0);
|
||||
|
||||
auto const size = on_finish_parsing(true);
|
||||
return std::make_pair(std::make_optional(true), size);
|
||||
return std::make_optional(true);
|
||||
}
|
||||
|
||||
BOOST_ASSERT_MSG(
|
||||
@@ -114,13 +107,13 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec
|
||||
BOOST_ASSERT(reqs_.front() != nullptr);
|
||||
BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0);
|
||||
|
||||
if (!resp3::parse(parser_, read_buffer_, reqs_.front()->get_adapter(), ec))
|
||||
return std::make_pair(std::nullopt, 0);
|
||||
if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec))
|
||||
return std::nullopt;
|
||||
|
||||
if (ec) {
|
||||
reqs_.front()->notify_error(ec);
|
||||
reqs_.pop_front();
|
||||
return std::make_pair(std::make_optional(false), 0);
|
||||
return std::make_optional(false);
|
||||
}
|
||||
|
||||
reqs_.front()->commit_response(parser_.get_consumed());
|
||||
@@ -130,14 +123,31 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(system::error_code& ec
|
||||
reqs_.pop_front();
|
||||
}
|
||||
|
||||
auto const size = on_finish_parsing(false);
|
||||
return std::make_pair(std::make_optional(false), size);
|
||||
return std::make_optional(false);
|
||||
}
|
||||
|
||||
std::pair<tribool, std::size_t> multiplexer::consume_next(
|
||||
std::string_view data,
|
||||
system::error_code& ec)
|
||||
{
|
||||
auto const ret = consume_next_impl(data, ec);
|
||||
auto const consumed = parser_.get_consumed();
|
||||
if (ec) {
|
||||
return std::make_pair(ret, consumed);
|
||||
}
|
||||
|
||||
if (ret.has_value()) {
|
||||
parser_.reset();
|
||||
commit_usage(ret.value(), consumed);
|
||||
return std::make_pair(ret, consumed);
|
||||
}
|
||||
|
||||
return std::make_pair(std::nullopt, consumed);
|
||||
}
|
||||
|
||||
void multiplexer::reset()
|
||||
{
|
||||
write_buffer_.clear();
|
||||
read_buffer_.clear();
|
||||
parser_.reset();
|
||||
on_push_ = false;
|
||||
cancel_run_called_ = false;
|
||||
@@ -222,35 +232,29 @@ auto multiplexer::cancel_on_conn_lost() -> std::size_t
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::size_t multiplexer::on_finish_parsing(bool is_push)
|
||||
void multiplexer::commit_usage(bool is_push, std::size_t size)
|
||||
{
|
||||
if (is_push) {
|
||||
usage_.pushes_received += 1;
|
||||
usage_.push_bytes_received += parser_.get_consumed();
|
||||
usage_.push_bytes_received += size;
|
||||
on_push_ = false;
|
||||
} else {
|
||||
usage_.responses_received += 1;
|
||||
usage_.response_bytes_received += parser_.get_consumed();
|
||||
usage_.response_bytes_received += size;
|
||||
}
|
||||
|
||||
on_push_ = false;
|
||||
read_buffer_.erase(0, parser_.get_consumed());
|
||||
auto const size = parser_.get_consumed();
|
||||
parser_.reset();
|
||||
return size;
|
||||
}
|
||||
|
||||
bool multiplexer::is_next_push() const noexcept
|
||||
bool multiplexer::is_next_push(std::string_view data) const noexcept
|
||||
{
|
||||
BOOST_ASSERT(!read_buffer_.empty());
|
||||
|
||||
// Useful links to understand the heuristics below.
|
||||
//
|
||||
// - https://github.com/redis/redis/issues/11784
|
||||
// - https://github.com/redis/redis/issues/6426
|
||||
// - https://github.com/boostorg/redis/issues/170
|
||||
|
||||
// The message's resp3 type is a push.
|
||||
if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
|
||||
// Test if the message resp3 type is a push.
|
||||
BOOST_ASSERT(!data.empty());
|
||||
if (resp3::to_type(data.front()) == resp3::type::push)
|
||||
return true;
|
||||
|
||||
// This is non-push type and the requests queue is empty. I have
|
||||
@@ -305,8 +309,12 @@ bool multiplexer::is_waiting_response() const noexcept
|
||||
|
||||
bool multiplexer::is_writing() const noexcept { return !write_buffer_.empty(); }
|
||||
|
||||
auto make_elem(request const& req, multiplexer::pipeline_adapter_type adapter)
|
||||
-> std::shared_ptr<multiplexer::elem>
|
||||
void multiplexer::set_receive_adapter(any_adapter adapter)
|
||||
{
|
||||
receive_adapter_ = std::move(adapter);
|
||||
}
|
||||
|
||||
auto make_elem(request const& req, any_adapter adapter) -> std::shared_ptr<multiplexer::elem>
|
||||
{
|
||||
return std::make_shared<multiplexer::elem>(req, std::move(adapter));
|
||||
}
|
||||
|
||||
79
include/boost/redis/impl/read_buffer.ipp
Normal file
79
include/boost/redis/impl/read_buffer.ipp
Normal file
@@ -0,0 +1,79 @@
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/core/make_span.hpp>
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
system::error_code read_buffer::prepare_append()
|
||||
{
|
||||
BOOST_ASSERT(append_buf_begin_ == buffer_.size());
|
||||
|
||||
auto const new_size = append_buf_begin_ + cfg_.read_buffer_append_size;
|
||||
|
||||
if (new_size > cfg_.max_read_size) {
|
||||
return error::exceeds_maximum_read_buffer_size;
|
||||
}
|
||||
|
||||
buffer_.resize(new_size);
|
||||
return {};
|
||||
}
|
||||
|
||||
void read_buffer::commit_append(std::size_t read_size)
|
||||
{
|
||||
BOOST_ASSERT(buffer_.size() >= (append_buf_begin_ + read_size));
|
||||
buffer_.resize(append_buf_begin_ + read_size);
|
||||
append_buf_begin_ = buffer_.size();
|
||||
}
|
||||
|
||||
auto read_buffer::get_append_buffer() noexcept -> span_type
|
||||
{
|
||||
auto const size = buffer_.size();
|
||||
return make_span(buffer_.data() + append_buf_begin_, size - append_buf_begin_);
|
||||
}
|
||||
|
||||
auto read_buffer::get_committed_buffer() const noexcept -> std::string_view
|
||||
{
|
||||
BOOST_ASSERT(!buffer_.empty());
|
||||
return {buffer_.data(), append_buf_begin_};
|
||||
}
|
||||
|
||||
auto read_buffer::get_committed_size() const noexcept -> std::size_t { return append_buf_begin_; }
|
||||
|
||||
void read_buffer::clear()
|
||||
{
|
||||
buffer_.clear();
|
||||
append_buf_begin_ = 0;
|
||||
}
|
||||
|
||||
std::size_t read_buffer::consume_committed(std::size_t size)
|
||||
{
|
||||
// For convenience, if the requested size is larger than the
|
||||
// committed buffer we cap it to the maximum.
|
||||
if (size > append_buf_begin_)
|
||||
size = append_buf_begin_;
|
||||
|
||||
buffer_.erase(buffer_.begin(), buffer_.begin() + size);
|
||||
BOOST_ASSERT(append_buf_begin_ >= size);
|
||||
append_buf_begin_ -= size;
|
||||
return size;
|
||||
}
|
||||
|
||||
void read_buffer::reserve(std::size_t n) { buffer_.reserve(n); }
|
||||
|
||||
bool operator==(read_buffer const& lhs, read_buffer const& rhs)
|
||||
{
|
||||
return lhs.buffer_ == rhs.buffer_ && lhs.append_buf_begin_ == rhs.append_buf_begin_;
|
||||
}
|
||||
|
||||
bool operator!=(read_buffer const& lhs, read_buffer const& rhs) { return !(lhs == rhs); }
|
||||
|
||||
} // namespace boost::redis::detail
|
||||
@@ -6,12 +6,14 @@
|
||||
|
||||
#include <boost/redis/detail/coroutine.hpp>
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
#include <boost/redis/detail/reader_fsm.hpp>
|
||||
|
||||
namespace boost::redis::detail {
|
||||
|
||||
reader_fsm::reader_fsm(multiplexer& mpx) noexcept
|
||||
: mpx_{&mpx}
|
||||
reader_fsm::reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept
|
||||
: read_buffer_{&rbuf}
|
||||
, mpx_{&mpx}
|
||||
{ }
|
||||
|
||||
reader_fsm::action reader_fsm::resume(
|
||||
@@ -24,22 +26,32 @@ reader_fsm::action reader_fsm::resume(
|
||||
BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation)
|
||||
|
||||
for (;;) {
|
||||
BOOST_REDIS_YIELD(resume_point_, 2, next_read_type_)
|
||||
ec = read_buffer_->prepare_append();
|
||||
if (ec) {
|
||||
action_after_resume_ = {action::type::done, 0, ec};
|
||||
BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run)
|
||||
return action_after_resume_;
|
||||
}
|
||||
|
||||
BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_)
|
||||
read_buffer_->commit_append(bytes_read);
|
||||
if (ec) {
|
||||
// TODO: If an error occurred but data was read (i.e.
|
||||
// bytes_read != 0) we should try to process that data and
|
||||
// deliver it to the user before calling cancel_run.
|
||||
action_after_resume_ = {action::type::done, bytes_read, ec};
|
||||
BOOST_REDIS_YIELD(resume_point_, 3, action::type::cancel_run)
|
||||
BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run)
|
||||
return action_after_resume_;
|
||||
}
|
||||
|
||||
next_read_type_ = action::type::append_some;
|
||||
while (!mpx_->get_read_buffer().empty()) {
|
||||
res_ = mpx_->consume_next(ec);
|
||||
while (read_buffer_->get_committed_size() != 0) {
|
||||
res_ = mpx_->consume_next(read_buffer_->get_committed_buffer(), ec);
|
||||
if (ec) {
|
||||
// TODO: Perhaps log what has not been consumed to aid
|
||||
// debugging.
|
||||
action_after_resume_ = {action::type::done, res_.second, ec};
|
||||
BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run)
|
||||
BOOST_REDIS_YIELD(resume_point_, 5, action::type::cancel_run)
|
||||
return action_after_resume_;
|
||||
}
|
||||
|
||||
@@ -48,6 +60,8 @@ reader_fsm::action reader_fsm::resume(
|
||||
break;
|
||||
}
|
||||
|
||||
read_buffer_->consume_committed(res_.second);
|
||||
|
||||
if (res_.first.value()) {
|
||||
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
|
||||
if (ec) {
|
||||
|
||||
@@ -27,22 +27,10 @@ parser::parser() { reset(); }
|
||||
void parser::reset()
|
||||
{
|
||||
depth_ = 0;
|
||||
sizes_ = {{1}};
|
||||
bulk_length_ = (std::numeric_limits<std::size_t>::max)();
|
||||
sizes_ = default_sizes;
|
||||
bulk_length_ = default_bulk_length;
|
||||
bulk_ = type::invalid;
|
||||
consumed_ = 0;
|
||||
sizes_[0] = 2; // The sentinel must be more than 1.
|
||||
}
|
||||
|
||||
std::size_t parser::get_suggested_buffer_growth(std::size_t hint) const noexcept
|
||||
{
|
||||
if (!bulk_expected())
|
||||
return hint;
|
||||
|
||||
if (hint < bulk_length_ + 2)
|
||||
return bulk_length_ + 2;
|
||||
|
||||
return hint;
|
||||
}
|
||||
|
||||
std::size_t parser::get_consumed() const noexcept { return consumed_; }
|
||||
@@ -206,4 +194,13 @@ auto parser::consume_impl(type t, std::string_view elem, system::error_code& ec)
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool parser::is_parsing() const noexcept
|
||||
{
|
||||
auto const v = depth_ == 0 && sizes_ == default_sizes && bulk_length_ == default_bulk_length &&
|
||||
bulk_ == type::invalid && consumed_ == 0;
|
||||
|
||||
return !v;
|
||||
}
|
||||
|
||||
} // namespace boost::redis::resp3
|
||||
|
||||
@@ -27,6 +27,14 @@ public:
|
||||
static constexpr std::string_view sep = "\r\n";
|
||||
|
||||
private:
|
||||
using sizes_type = std::array<std::size_t, max_embedded_depth + 1>;
|
||||
|
||||
// sizes_[0] = 2 because the sentinel must be more than 1.
|
||||
static constexpr sizes_type default_sizes = {
|
||||
{2, 1, 1, 1, 1, 1}
|
||||
};
|
||||
static constexpr auto default_bulk_length = static_cast<std::size_t>(-1);
|
||||
|
||||
// The current depth. Simple data types will have depth 0, whereas
|
||||
// the elements of aggregates will have depth 1. Embedded types
|
||||
// will have increasing depth.
|
||||
@@ -35,7 +43,7 @@ private:
|
||||
// The parser supports up to 5 levels of nested structures. The
|
||||
// first element in the sizes stack is a sentinel and must be
|
||||
// different from 1.
|
||||
std::array<std::size_t, max_embedded_depth + 1> sizes_;
|
||||
sizes_type sizes_;
|
||||
|
||||
// Contains the length expected in the next bulk read.
|
||||
std::size_t bulk_length_;
|
||||
@@ -67,21 +75,26 @@ public:
|
||||
[[nodiscard]]
|
||||
auto done() const noexcept -> bool;
|
||||
|
||||
auto get_suggested_buffer_growth(std::size_t hint) const noexcept -> std::size_t;
|
||||
|
||||
auto get_consumed() const noexcept -> std::size_t;
|
||||
|
||||
auto consume(std::string_view view, system::error_code& ec) noexcept -> result;
|
||||
|
||||
void reset();
|
||||
|
||||
bool is_parsing() const noexcept;
|
||||
};
|
||||
|
||||
// Returns false if more data is needed. If true is returned the
|
||||
// parser is either done or an error occured, that can be checked on
|
||||
// ec.
|
||||
template <class Adapter>
|
||||
bool parse(resp3::parser& p, std::string_view const& msg, Adapter& adapter, system::error_code& ec)
|
||||
bool parse(parser& p, std::string_view const& msg, Adapter& adapter, system::error_code& ec)
|
||||
{
|
||||
// This if could be avoid with a state machine that jumps into the
|
||||
// correct position.
|
||||
if (!p.is_parsing())
|
||||
adapter.on_init();
|
||||
|
||||
while (!p.done()) {
|
||||
auto const res = p.consume(msg, ec);
|
||||
if (ec)
|
||||
@@ -90,11 +103,12 @@ bool parse(resp3::parser& p, std::string_view const& msg, Adapter& adapter, syst
|
||||
if (!res)
|
||||
return false;
|
||||
|
||||
adapter(res.value(), ec);
|
||||
adapter.on_node(res.value(), ec);
|
||||
if (ec)
|
||||
return true;
|
||||
}
|
||||
|
||||
adapter.on_done();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -108,6 +108,8 @@ namespace detail {
|
||||
template <class Adapter>
|
||||
void deserialize(std::string_view const& data, Adapter adapter, system::error_code& ec)
|
||||
{
|
||||
adapter.on_init();
|
||||
|
||||
parser parser;
|
||||
while (!parser.done()) {
|
||||
auto const res = parser.consume(data, ec);
|
||||
@@ -116,12 +118,14 @@ void deserialize(std::string_view const& data, Adapter adapter, system::error_co
|
||||
|
||||
BOOST_ASSERT(res.has_value());
|
||||
|
||||
adapter(res.value(), ec);
|
||||
adapter.on_node(res.value(), ec);
|
||||
if (ec)
|
||||
return;
|
||||
}
|
||||
|
||||
BOOST_ASSERT(parser.get_consumed() == std::size(data));
|
||||
|
||||
adapter.on_done();
|
||||
}
|
||||
|
||||
template <class Adapter>
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
@@ -10,7 +10,7 @@
|
||||
#include <boost/redis/adapter/result.hpp>
|
||||
#include <boost/redis/resp3/node.hpp>
|
||||
|
||||
#include <boost/system.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <boost/redis/impl/ignore.ipp>
|
||||
#include <boost/redis/impl/logger.ipp>
|
||||
#include <boost/redis/impl/multiplexer.ipp>
|
||||
#include <boost/redis/impl/read_buffer.ipp>
|
||||
#include <boost/redis/impl/reader_fsm.ipp>
|
||||
#include <boost/redis/impl/request.ipp>
|
||||
#include <boost/redis/impl/resp3_handshaker.ipp>
|
||||
|
||||
@@ -44,7 +44,6 @@ make_test(test_reader_fsm)
|
||||
|
||||
# Tests that require a real Redis server
|
||||
make_test(test_conn_quit)
|
||||
make_test(test_conn_tls)
|
||||
make_test(test_conn_exec_retry)
|
||||
make_test(test_conn_exec_error)
|
||||
make_test(test_run)
|
||||
@@ -55,9 +54,11 @@ make_test(test_conn_reconnect)
|
||||
make_test(test_conn_exec_cancel)
|
||||
make_test(test_conn_exec_cancel2)
|
||||
make_test(test_conn_echo_stress)
|
||||
make_test(test_conn_move)
|
||||
make_test(test_issue_50)
|
||||
make_test(test_issue_181)
|
||||
make_test(test_conversions)
|
||||
make_test(test_conn_tls)
|
||||
make_test(test_unix_sockets)
|
||||
|
||||
# Coverage
|
||||
|
||||
@@ -50,7 +50,6 @@ boost::redis::config make_test_config()
|
||||
{
|
||||
boost::redis::config cfg;
|
||||
cfg.addr.host = get_server_hostname();
|
||||
cfg.max_read_size = 1000000;
|
||||
return cfg;
|
||||
}
|
||||
|
||||
@@ -69,3 +68,10 @@ void run_coroutine_test(net::awaitable<void> op, std::chrono::steady_clock::dura
|
||||
throw std::runtime_error("Coroutine test did not finish");
|
||||
}
|
||||
#endif // BOOST_ASIO_HAS_CO_AWAIT
|
||||
|
||||
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data)
|
||||
{
|
||||
auto const buffer = rbuf.get_append_buffer();
|
||||
BOOST_ASSERT(data.size() <= buffer.size());
|
||||
std::copy(data.begin(), data.end(), buffer.begin());
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/redis/connection.hpp>
|
||||
#include <boost/redis/detail/reader_fsm.hpp>
|
||||
#include <boost/redis/operation.hpp>
|
||||
|
||||
#include <boost/asio/awaitable.hpp>
|
||||
@@ -34,3 +35,5 @@ void run(
|
||||
boost::redis::config cfg = make_test_config(),
|
||||
boost::system::error_code ec = boost::asio::error::operation_aborted,
|
||||
boost::redis::operation op = boost::redis::operation::receive);
|
||||
|
||||
void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
@@ -16,6 +16,7 @@ using boost::redis::generic_response;
|
||||
using boost::redis::response;
|
||||
using boost::redis::ignore;
|
||||
using boost::redis::any_adapter;
|
||||
using boost::redis::any_adapter;
|
||||
|
||||
BOOST_AUTO_TEST_CASE(any_adapter_response_types)
|
||||
{
|
||||
@@ -34,13 +35,13 @@ BOOST_AUTO_TEST_CASE(any_adapter_copy_move)
|
||||
{
|
||||
// any_adapter can be copied/moved
|
||||
response<int, std::string> r;
|
||||
any_adapter ad1{r};
|
||||
auto ad1 = any_adapter{r};
|
||||
|
||||
// copy constructor
|
||||
any_adapter ad2{ad1};
|
||||
auto ad2 = any_adapter(ad1);
|
||||
|
||||
// move constructor
|
||||
any_adapter ad3{std::move(ad2)};
|
||||
auto ad3 = any_adapter(std::move(ad2));
|
||||
|
||||
// copy assignment
|
||||
BOOST_CHECK_NO_THROW(ad2 = ad1);
|
||||
|
||||
@@ -31,6 +31,7 @@ using boost::redis::ignore;
|
||||
using boost::redis::operation;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::any_adapter;
|
||||
using boost::system::error_code;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -229,7 +230,7 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter)
|
||||
|
||||
bool finished = false;
|
||||
|
||||
conn->async_exec(req, boost::redis::any_adapter(res), [&](error_code ec, std::size_t) {
|
||||
conn->async_exec(req, res, [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn->cancel();
|
||||
finished = true;
|
||||
@@ -242,4 +243,4 @@ BOOST_AUTO_TEST_CASE(exec_any_adapter)
|
||||
BOOST_TEST(std::get<0>(res).value() == "PONG");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace
|
||||
|
||||
@@ -28,10 +28,6 @@ using namespace boost::redis;
|
||||
|
||||
namespace {
|
||||
|
||||
// user tests
|
||||
// logging can be disabled
|
||||
// logging can be changed verbosity
|
||||
|
||||
template <class Conn>
|
||||
void run_with_invalid_config(net::io_context& ioc, Conn& conn)
|
||||
{
|
||||
|
||||
112
test/test_conn_move.cpp
Normal file
112
test/test_conn_move.cpp
Normal file
@@ -0,0 +1,112 @@
|
||||
//
|
||||
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
|
||||
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
|
||||
#include <boost/redis/connection.hpp>
|
||||
#include <boost/redis/request.hpp>
|
||||
#include <boost/redis/response.hpp>
|
||||
|
||||
#include <boost/asio/bind_executor.hpp>
|
||||
#include <boost/asio/error.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#include <cstddef>
|
||||
#include <string>
|
||||
|
||||
using boost::system::error_code;
|
||||
namespace net = boost::asio;
|
||||
using namespace boost::redis;
|
||||
|
||||
namespace {
|
||||
|
||||
// Move constructing a connection doesn't leave dangling pointers
|
||||
void test_conn_move_construct()
|
||||
{
|
||||
// Setup
|
||||
net::io_context ioc;
|
||||
connection conn_prev(ioc);
|
||||
connection conn(std::move(conn_prev));
|
||||
request req;
|
||||
req.push("PING", "something");
|
||||
response<std::string> res;
|
||||
|
||||
bool run_finished = false, exec_finished = false;
|
||||
|
||||
// Run the connection
|
||||
conn.async_run(make_test_config(), [&](error_code ec) {
|
||||
run_finished = true;
|
||||
BOOST_TEST_EQ(ec, net::error::operation_aborted);
|
||||
});
|
||||
|
||||
// Launch a PING
|
||||
conn.async_exec(req, res, [&](error_code ec, std::size_t) {
|
||||
exec_finished = true;
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
conn.cancel();
|
||||
});
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
// Check
|
||||
BOOST_TEST(run_finished);
|
||||
BOOST_TEST(exec_finished);
|
||||
BOOST_TEST_EQ(std::get<0>(res).value(), "something");
|
||||
}
|
||||
|
||||
// Moving a connection is safe even when it's running,
|
||||
// and it doesn't leave dangling pointers
|
||||
void test_conn_move_assign_while_running()
|
||||
{
|
||||
// Setup
|
||||
net::io_context ioc;
|
||||
connection conn(ioc);
|
||||
connection conn2(ioc); // will be assigned to
|
||||
request req;
|
||||
req.push("PING", "something");
|
||||
response<std::string> res;
|
||||
|
||||
bool run_finished = false, exec_finished = false;
|
||||
|
||||
// Run the connection
|
||||
conn.async_run(make_test_config(), [&](error_code ec) {
|
||||
run_finished = true;
|
||||
BOOST_TEST_EQ(ec, net::error::operation_aborted);
|
||||
});
|
||||
|
||||
// Launch a PING. When it finishes, conn will be moved-from, and conn2 will be valid
|
||||
conn.async_exec(req, res, [&](error_code ec, std::size_t) {
|
||||
exec_finished = true;
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
conn2.cancel();
|
||||
});
|
||||
|
||||
// While the operations are running, perform a move
|
||||
net::post(net::bind_executor(ioc.get_executor(), [&] {
|
||||
conn2 = std::move(conn);
|
||||
}));
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
// Check
|
||||
BOOST_TEST(run_finished);
|
||||
BOOST_TEST(exec_finished);
|
||||
BOOST_TEST_EQ(std::get<0>(res).value(), "something");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int main()
|
||||
{
|
||||
test_conn_move_construct();
|
||||
test_conn_move_assign_while_running();
|
||||
|
||||
return boost::report_errors();
|
||||
}
|
||||
@@ -180,19 +180,15 @@ struct response_error_tag { };
|
||||
response_error_tag error_tag_obj;
|
||||
|
||||
struct response_error_adapter {
|
||||
void operator()(
|
||||
std::size_t,
|
||||
void on_init() { }
|
||||
void on_done() { }
|
||||
|
||||
void on_node(
|
||||
boost::redis::resp3::basic_node<std::string_view> const&,
|
||||
boost::system::error_code& ec)
|
||||
{
|
||||
ec = boost::redis::error::incompatible_size;
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_supported_response_size() const noexcept
|
||||
{
|
||||
return static_cast<std::size_t>(-1);
|
||||
}
|
||||
};
|
||||
|
||||
auto boost_redis_adapt(response_error_tag&) { return response_error_adapter{}; }
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
@@ -81,10 +83,8 @@ struct elem_and_request {
|
||||
{
|
||||
// Empty requests are not valid. The request needs to be populated before creating the element
|
||||
req.push("get", "mykey");
|
||||
elm = std::make_shared<multiplexer::elem>(req, any_adapter{});
|
||||
|
||||
elm = std::make_shared<multiplexer::elem>(
|
||||
req,
|
||||
[](std::size_t, resp3::node_view const&, error_code&) { });
|
||||
elm->set_done_callback([this] {
|
||||
++done_calls;
|
||||
});
|
||||
@@ -117,8 +117,7 @@ void test_success()
|
||||
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
|
||||
|
||||
// Simulate a successful read
|
||||
mpx.get_read_buffer() = "$5\r\nhello\r\n";
|
||||
auto req_status = mpx.consume_next(ec);
|
||||
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push
|
||||
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
|
||||
@@ -159,10 +158,9 @@ void test_parse_error()
|
||||
// The second field should be a number (rather than the empty string).
|
||||
// Note that although part of the buffer was consumed, the multiplexer
|
||||
// currently throws this information away.
|
||||
mpx.get_read_buffer() = "*2\r\n$5\r\nhello\r\n:\r\n";
|
||||
auto req_status = mpx.consume_next(ec);
|
||||
auto req_status = mpx.consume_next("*2\r\n$5\r\nhello\r\n:\r\n", ec);
|
||||
BOOST_TEST_EQ(ec, error::empty_field);
|
||||
BOOST_TEST_EQ(req_status.second, 0u);
|
||||
BOOST_TEST_EQ(req_status.second, 15u);
|
||||
BOOST_TEST_EQ(input.done_calls, 1u);
|
||||
|
||||
// This will awaken the exec operation, and should complete the operation
|
||||
@@ -218,8 +216,7 @@ void test_not_connected()
|
||||
BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response
|
||||
|
||||
// Simulate a successful read
|
||||
mpx.get_read_buffer() = "$5\r\nhello\r\n";
|
||||
auto req_status = mpx.consume_next(ec);
|
||||
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push
|
||||
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
|
||||
@@ -342,8 +339,7 @@ void test_cancel_notwaiting_notterminal()
|
||||
BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name);
|
||||
|
||||
// Simulate a successful read
|
||||
mpx.get_read_buffer() = "$5\r\nhello\r\n";
|
||||
auto req_status = mpx.consume_next(ec);
|
||||
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
|
||||
BOOST_TEST_EQ_MSG(ec, error_code(), tc.name);
|
||||
BOOST_TEST_EQ_MSG(req_status.first.value(), false, tc.name); // it wasn't a push
|
||||
BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed
|
||||
|
||||
@@ -528,6 +528,7 @@ BOOST_AUTO_TEST_CASE(cover_error)
|
||||
check_error("boost.redis", boost::redis::error::sync_receive_push_failed);
|
||||
check_error("boost.redis", boost::redis::error::incompatible_node_depth);
|
||||
check_error("boost.redis", boost::redis::error::resp3_hello);
|
||||
check_error("boost.redis", boost::redis::error::exceeds_maximum_read_buffer_size);
|
||||
}
|
||||
|
||||
std::string get_type_as_str(boost::redis::resp3::type t)
|
||||
@@ -594,8 +595,12 @@ BOOST_AUTO_TEST_CASE(adapter)
|
||||
response<std::string, int, ignore_t> resp;
|
||||
|
||||
auto f = boost_redis_adapt(resp);
|
||||
f(0, resp3::basic_node<std::string_view>{type::simple_string, 1, 0, "Hello"}, ec);
|
||||
f(1, resp3::basic_node<std::string_view>{type::number, 1, 0, "42"}, ec);
|
||||
f.on_init();
|
||||
f.on_node(resp3::node_view{type::simple_string, 1, 0, "Hello"}, ec);
|
||||
f.on_done();
|
||||
f.on_init();
|
||||
f.on_node(resp3::node_view{type::number, 1, 0, "42"}, ec);
|
||||
f.on_done();
|
||||
|
||||
BOOST_CHECK_EQUAL(std::get<0>(resp).value(), "Hello");
|
||||
BOOST_TEST(!ec);
|
||||
@@ -613,7 +618,7 @@ BOOST_AUTO_TEST_CASE(adapter_as)
|
||||
|
||||
for (auto const& e : set_expected1a.value()) {
|
||||
error_code ec;
|
||||
adapter(e, ec);
|
||||
adapter.on_node(e, ec);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
#include <boost/redis/adapter/adapt.hpp>
|
||||
#include <boost/redis/adapter/any_adapter.hpp>
|
||||
#include <boost/redis/detail/multiplexer.hpp>
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
#include <boost/redis/detail/resp3_handshaker.hpp>
|
||||
#include <boost/redis/resp3/node.hpp>
|
||||
#include <boost/redis/resp3/serialization.hpp>
|
||||
@@ -14,30 +15,38 @@
|
||||
#define BOOST_TEST_MODULE conn_quit
|
||||
#include <boost/test/included/unit_test.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
|
||||
using boost::redis::request;
|
||||
using boost::redis::config;
|
||||
using boost::redis::detail::push_hello;
|
||||
using boost::redis::response;
|
||||
using boost::redis::adapter::adapt2;
|
||||
using boost::redis::adapter::result;
|
||||
using boost::redis::resp3::detail::deserialize;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::config;
|
||||
using boost::redis::detail::multiplexer;
|
||||
using boost::redis::detail::push_hello;
|
||||
using boost::redis::generic_response;
|
||||
using boost::redis::ignore_t;
|
||||
using boost::redis::request;
|
||||
using boost::redis::resp3::detail::deserialize;
|
||||
using boost::redis::resp3::node;
|
||||
using boost::redis::resp3::to_string;
|
||||
using boost::redis::response;
|
||||
using boost::redis::any_adapter;
|
||||
using boost::system::error_code;
|
||||
|
||||
#define RESP3_SET_PART1 "~6\r\n+orange\r"
|
||||
#define RESP3_SET_PART2 "\n+apple\r\n+one"
|
||||
#define RESP3_SET_PART3 "\r\n+two\r"
|
||||
#define RESP3_SET_PART4 "\n+three\r\n+orange\r\n"
|
||||
char const* resp3_set = RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3 RESP3_SET_PART4;
|
||||
|
||||
BOOST_AUTO_TEST_CASE(low_level_sync_sans_io)
|
||||
{
|
||||
try {
|
||||
result<std::set<std::string>> resp;
|
||||
|
||||
char const* wire = "~6\r\n+orange\r\n+apple\r\n+one\r\n+two\r\n+three\r\n+orange\r\n";
|
||||
deserialize(wire, adapt2(resp));
|
||||
deserialize(resp3_set, adapt2(resp));
|
||||
|
||||
for (auto const& e : resp.value())
|
||||
std::cout << e << std::endl;
|
||||
@@ -256,12 +265,10 @@ BOOST_AUTO_TEST_CASE(multiplexer_push)
|
||||
{
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_response(resp);
|
||||
|
||||
mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n";
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
|
||||
boost::system::error_code ec;
|
||||
auto const ret = mpx.consume_next(ec);
|
||||
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
|
||||
|
||||
BOOST_TEST(ret.first.value());
|
||||
BOOST_CHECK_EQUAL(ret.second, 16u);
|
||||
@@ -280,18 +287,19 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
|
||||
{
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_response(resp);
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
|
||||
std::string msg;
|
||||
// Only part of the message.
|
||||
mpx.get_read_buffer() = ">2\r\n+one\r";
|
||||
msg += ">2\r\n+one\r";
|
||||
|
||||
boost::system::error_code ec;
|
||||
auto ret = mpx.consume_next(ec);
|
||||
auto ret = mpx.consume_next(msg, ec);
|
||||
|
||||
BOOST_TEST(!ret.first.has_value());
|
||||
|
||||
mpx.get_read_buffer().append("\n+two\r\n");
|
||||
ret = mpx.consume_next(ec);
|
||||
msg += "\n+two\r\n";
|
||||
ret = mpx.consume_next(msg, ec);
|
||||
|
||||
BOOST_TEST(ret.first.value());
|
||||
BOOST_CHECK_EQUAL(ret.second, 16u);
|
||||
@@ -315,7 +323,7 @@ struct test_item {
|
||||
// to Redis.
|
||||
req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
|
||||
|
||||
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter(resp).impl_.adapt_fn);
|
||||
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter{resp});
|
||||
|
||||
elem_ptr->set_done_callback([this]() {
|
||||
done = true;
|
||||
@@ -378,20 +386,14 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
|
||||
BOOST_TEST(item2.done);
|
||||
BOOST_TEST(!item3.done);
|
||||
|
||||
// Simulates a socket read by putting some data in the read buffer.
|
||||
mpx.get_read_buffer().append("+one\r\n");
|
||||
|
||||
// Consumes the next message in the read buffer.
|
||||
boost::system::error_code ec;
|
||||
auto const ret = mpx.consume_next(ec);
|
||||
auto const ret = mpx.consume_next("+one\r\n", ec);
|
||||
|
||||
// The read operation should have been successfull.
|
||||
BOOST_TEST(ret.first.has_value());
|
||||
BOOST_TEST(ret.second != 0u);
|
||||
|
||||
// The read buffer should also be empty now
|
||||
BOOST_TEST(mpx.get_read_buffer().empty());
|
||||
|
||||
// The last request still did not get a response.
|
||||
BOOST_TEST(item1.done);
|
||||
BOOST_TEST(item2.done);
|
||||
@@ -399,3 +401,113 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
|
||||
|
||||
// TODO: Check the first request was removed from the queue.
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(read_buffer_prepare_error)
|
||||
{
|
||||
using boost::redis::detail::read_buffer;
|
||||
|
||||
read_buffer buf;
|
||||
|
||||
// Usual case, max size is bigger then requested size.
|
||||
buf.set_config({10, 10});
|
||||
auto ec = buf.prepare_append();
|
||||
BOOST_TEST(!ec);
|
||||
buf.commit_append(10);
|
||||
|
||||
// Corner case, max size is equal to the requested size.
|
||||
buf.set_config({10, 20});
|
||||
ec = buf.prepare_append();
|
||||
BOOST_TEST(!ec);
|
||||
buf.commit_append(10);
|
||||
buf.consume_committed(20);
|
||||
|
||||
auto const tmp = buf;
|
||||
|
||||
// Error case, max size is smaller to the requested size.
|
||||
buf.set_config({10, 9});
|
||||
ec = buf.prepare_append();
|
||||
BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size});
|
||||
|
||||
// Check that an error call has no side effects.
|
||||
auto const res = buf == tmp;
|
||||
BOOST_TEST(res);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data)
|
||||
{
|
||||
using boost::redis::detail::read_buffer;
|
||||
|
||||
read_buffer buf;
|
||||
|
||||
buf.set_config({10, 10});
|
||||
auto ec = buf.prepare_append();
|
||||
BOOST_TEST(!ec);
|
||||
|
||||
// No data has been committed yet so nothing can be consummed.
|
||||
BOOST_CHECK_EQUAL(buf.consume_committed(5), 0u);
|
||||
|
||||
buf.commit_append(10);
|
||||
|
||||
// All five bytes can be consumed.
|
||||
BOOST_CHECK_EQUAL(buf.consume_committed(5), 5u);
|
||||
|
||||
// Only the remaining five bytes can be consumed
|
||||
BOOST_CHECK_EQUAL(buf.consume_committed(7), 5u);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size)
|
||||
{
|
||||
using boost::redis::detail::read_buffer;
|
||||
|
||||
read_buffer buf;
|
||||
|
||||
buf.set_config({10, 10});
|
||||
auto ec = buf.prepare_append();
|
||||
BOOST_TEST(!ec);
|
||||
|
||||
BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(check_counter_adapter)
|
||||
{
|
||||
using boost::redis::any_adapter;
|
||||
using boost::redis::resp3::parse;
|
||||
using boost::redis::resp3::parser;
|
||||
using boost::redis::resp3::node_view;
|
||||
using boost::system::error_code;
|
||||
|
||||
int init = 0;
|
||||
int node = 0;
|
||||
int done = 0;
|
||||
|
||||
auto counter_adapter = [&](any_adapter::parse_event ev, node_view const&, error_code&) mutable {
|
||||
switch (ev) {
|
||||
case any_adapter::parse_event::init: init++; break;
|
||||
case any_adapter::parse_event::node: node++; break;
|
||||
case any_adapter::parse_event::done: done++; break;
|
||||
}
|
||||
};
|
||||
|
||||
any_adapter wrapped{any_adapter::impl_t{counter_adapter}};
|
||||
|
||||
error_code ec;
|
||||
parser p;
|
||||
|
||||
auto const ret1 = parse(p, RESP3_SET_PART1, wrapped, ec);
|
||||
auto const ret2 = parse(p, RESP3_SET_PART1 RESP3_SET_PART2, wrapped, ec);
|
||||
auto const ret3 = parse(p, RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3, wrapped, ec);
|
||||
auto const ret4 = parse(
|
||||
p,
|
||||
RESP3_SET_PART1 RESP3_SET_PART2 RESP3_SET_PART3 RESP3_SET_PART4,
|
||||
wrapped,
|
||||
ec);
|
||||
|
||||
BOOST_TEST(!ret1);
|
||||
BOOST_TEST(!ret2);
|
||||
BOOST_TEST(!ret3);
|
||||
BOOST_TEST(ret4);
|
||||
|
||||
BOOST_CHECK_EQUAL(init, 1);
|
||||
BOOST_CHECK_EQUAL(node, 7);
|
||||
BOOST_CHECK_EQUAL(done, 1);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
|
||||
#include <boost/redis/detail/read_buffer.hpp>
|
||||
#include <boost/redis/detail/reader_fsm.hpp>
|
||||
|
||||
#include <boost/asio/cancellation_type.hpp>
|
||||
@@ -12,13 +13,17 @@
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
namespace net = boost::asio;
|
||||
namespace redis = boost::redis;
|
||||
using boost::system::error_code;
|
||||
using net::cancellation_type_t;
|
||||
using redis::detail::reader_fsm;
|
||||
using redis::detail::multiplexer;
|
||||
using redis::detail::read_buffer;
|
||||
using redis::generic_response;
|
||||
using redis::any_adapter;
|
||||
using action = redis::detail::reader_fsm::action;
|
||||
|
||||
namespace boost::redis::detail {
|
||||
@@ -37,10 +42,11 @@ namespace {
|
||||
|
||||
void test_push()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_response(resp);
|
||||
reader_fsm fsm{mpx};
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -51,13 +57,15 @@ void test_push()
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
mpx.get_read_buffer().append(">1\r\n+msg1\r\n");
|
||||
mpx.get_read_buffer().append(">1\r\n+msg2 \r\n");
|
||||
mpx.get_read_buffer().append(">1\r\n+msg3 \r\n");
|
||||
auto const bytes_read = mpx.get_read_buffer().size();
|
||||
std::string const payload =
|
||||
">1\r\n+msg1\r\n"
|
||||
">1\r\n+msg2 \r\n"
|
||||
">1\r\n+msg3 \r\n";
|
||||
|
||||
append_read_data(rbuf, payload);
|
||||
|
||||
// Deliver the 1st push
|
||||
act = fsm.resume(bytes_read, ec, cancellation_type_t::none);
|
||||
act = fsm.resume(payload.size(), ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
|
||||
BOOST_TEST_EQ(act.push_size_, 11u);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
@@ -82,10 +90,11 @@ void test_push()
|
||||
|
||||
void test_read_needs_more()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_response(resp);
|
||||
reader_fsm fsm{mpx};
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -100,20 +109,20 @@ void test_read_needs_more()
|
||||
std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"};
|
||||
|
||||
// Passes the first part to the fsm.
|
||||
mpx.get_read_buffer().append(msg[0]);
|
||||
append_read_data(rbuf, msg[0]);
|
||||
act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::needs_more);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
// Passes the second part to the fsm.
|
||||
mpx.get_read_buffer().append(msg[1]);
|
||||
append_read_data(rbuf, msg[1]);
|
||||
act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::needs_more);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
// Passes the third and last part to the fsm, next it should ask us
|
||||
// to deliver the message.
|
||||
mpx.get_read_buffer().append(msg[2]);
|
||||
append_read_data(rbuf, msg[2]);
|
||||
act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
|
||||
BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size());
|
||||
@@ -127,10 +136,11 @@ void test_read_needs_more()
|
||||
|
||||
void test_read_error()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_response(resp);
|
||||
reader_fsm fsm{mpx};
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -141,26 +151,27 @@ void test_read_error()
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
mpx.get_read_buffer().append(">1\r\n+msg1\r\n");
|
||||
auto const bytes_read = mpx.get_read_buffer().size();
|
||||
std::string const payload = ">1\r\n+msg1\r\n";
|
||||
append_read_data(rbuf, payload);
|
||||
|
||||
// Deliver the data
|
||||
act = fsm.resume(bytes_read, {net::error::operation_aborted}, cancellation_type_t::none);
|
||||
act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
// Finish
|
||||
act = fsm.resume(bytes_read, ec, cancellation_type_t::none);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::done);
|
||||
BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted});
|
||||
}
|
||||
|
||||
void test_parse_error()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_response(resp);
|
||||
reader_fsm fsm{mpx};
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -171,26 +182,27 @@ void test_parse_error()
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
mpx.get_read_buffer().append(">a\r\n");
|
||||
auto const bytes_read = mpx.get_read_buffer().size();
|
||||
std::string const payload = ">a\r\n";
|
||||
append_read_data(rbuf, payload);
|
||||
|
||||
// Deliver the data
|
||||
act = fsm.resume(bytes_read, {}, cancellation_type_t::none);
|
||||
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
// Finish
|
||||
act = fsm.resume(bytes_read, {}, cancellation_type_t::none);
|
||||
act = fsm.resume(0, {}, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::done);
|
||||
BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number});
|
||||
}
|
||||
|
||||
void test_push_deliver_error()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_response(resp);
|
||||
reader_fsm fsm{mpx};
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
@@ -201,16 +213,16 @@ void test_push_deliver_error()
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
|
||||
// The fsm is asking for data.
|
||||
mpx.get_read_buffer().append(">1\r\n+msg1\r\n");
|
||||
auto const bytes_read = mpx.get_read_buffer().size();
|
||||
std::string const payload = ">1\r\n+msg1\r\n";
|
||||
append_read_data(rbuf, payload);
|
||||
|
||||
// Deliver the data
|
||||
act = fsm.resume(bytes_read, {}, cancellation_type_t::none);
|
||||
act = fsm.resume(payload.size(), {}, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
// Resumes from notifying a push with an error.
|
||||
act = fsm.resume(bytes_read, net::error::operation_aborted, cancellation_type_t::none);
|
||||
act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
|
||||
|
||||
// Finish
|
||||
@@ -219,10 +231,40 @@ void test_push_deliver_error()
|
||||
BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted});
|
||||
}
|
||||
|
||||
void test_max_read_buffer_size()
|
||||
{
|
||||
read_buffer rbuf;
|
||||
rbuf.set_config({5, 7});
|
||||
multiplexer mpx;
|
||||
generic_response resp;
|
||||
mpx.set_receive_adapter(any_adapter{resp});
|
||||
reader_fsm fsm{rbuf, mpx};
|
||||
error_code ec;
|
||||
action act;
|
||||
|
||||
// Initiate
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::setup_cancellation);
|
||||
act = fsm.resume(0, ec, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::append_some);
|
||||
|
||||
// Passes the first part to the fsm.
|
||||
std::string const part1 = ">3\r\n";
|
||||
append_read_data(rbuf, part1);
|
||||
act = fsm.resume(part1.size(), {}, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::cancel_run);
|
||||
BOOST_TEST_EQ(act.ec_, error_code());
|
||||
|
||||
act = fsm.resume({}, {}, cancellation_type_t::none);
|
||||
BOOST_TEST_EQ(act.type_, action::type::done);
|
||||
BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
int main()
|
||||
{
|
||||
test_max_read_buffer_size();
|
||||
test_push_deliver_error();
|
||||
test_read_needs_more();
|
||||
test_push();
|
||||
|
||||
Reference in New Issue
Block a user