mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Fixes issue 50 and 44.
This commit is contained in:
@@ -299,6 +299,15 @@ if (MSVC)
|
||||
target_compile_definitions(test_request PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(test_issue_50 tests/issue_50.cpp)
|
||||
target_compile_features(test_issue_50 PUBLIC cxx_std_20)
|
||||
target_link_libraries(test_issue_50 common)
|
||||
add_test(test_issue_50 test_issue_50)
|
||||
if (MSVC)
|
||||
target_compile_options(test_issue_50 PRIVATE /bigobj)
|
||||
target_compile_definitions(test_issue_50 PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
# Install
|
||||
#=======================================================================
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ connection to read Redis
|
||||
```cpp
|
||||
auto co_main() -> net::awaitable<void>
|
||||
{
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
connection conn{co_await net::this_coro::executor};
|
||||
|
||||
// From examples/common.hpp to avoid vebosity
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
@@ -65,7 +65,7 @@ auto co_main() -> net::awaitable<void>
|
||||
std::tuple<ignore, std::map<std::string, std::string>, ignore> resp;
|
||||
|
||||
// Executes the request. See below why we are using operator ||.
|
||||
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
|
||||
co_await (conn.async_run() || conn.async_exec(req, adapt(resp)));
|
||||
// Use the map from std::get<1>(resp) ...
|
||||
}
|
||||
```
|
||||
|
||||
@@ -40,8 +40,6 @@ namespace detail
|
||||
|
||||
class ignore_adapter {
|
||||
public:
|
||||
explicit ignore_adapter(std::size_t max_read_size) : max_read_size_{max_read_size} {}
|
||||
|
||||
void
|
||||
operator()(
|
||||
std::size_t, resp3::node<std::string_view> const&, boost::system::error_code&) { }
|
||||
@@ -49,13 +47,6 @@ public:
|
||||
[[nodiscard]]
|
||||
auto get_supported_response_size() const noexcept
|
||||
{ return static_cast<std::size_t>(-1);}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_max_read_size(std::size_t) const noexcept
|
||||
{ return max_read_size_;}
|
||||
|
||||
private:
|
||||
std::size_t max_read_size_;
|
||||
};
|
||||
|
||||
template <class Tuple>
|
||||
@@ -67,11 +58,9 @@ private:
|
||||
using adapters_array_type = std::array<variant_type, size>;
|
||||
|
||||
adapters_array_type adapters_;
|
||||
std::size_t max_read_size_;
|
||||
|
||||
public:
|
||||
explicit static_adapter(Tuple& r, std::size_t max_read_size)
|
||||
: max_read_size_{max_read_size}
|
||||
explicit static_adapter(Tuple& r)
|
||||
{
|
||||
adapter::detail::assigner<size - 1>::assign(adapters_, r);
|
||||
}
|
||||
@@ -80,10 +69,6 @@ public:
|
||||
auto get_supported_response_size() const noexcept
|
||||
{ return size;}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_max_read_size(std::size_t) const noexcept
|
||||
{ return max_read_size_;}
|
||||
|
||||
void
|
||||
operator()(
|
||||
std::size_t i,
|
||||
@@ -102,12 +87,10 @@ class vector_adapter {
|
||||
private:
|
||||
using adapter_type = typename adapter::detail::response_traits<Vector>::adapter_type;
|
||||
adapter_type adapter_;
|
||||
std::size_t max_read_size_;
|
||||
|
||||
public:
|
||||
explicit vector_adapter(Vector& v, std::size_t max_read_size)
|
||||
explicit vector_adapter(Vector& v)
|
||||
: adapter_{adapter::adapt2(v)}
|
||||
, max_read_size_{max_read_size}
|
||||
{ }
|
||||
|
||||
[[nodiscard]]
|
||||
@@ -115,10 +98,6 @@ public:
|
||||
get_supported_response_size() const noexcept
|
||||
{ return static_cast<std::size_t>(-1);}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_max_read_size(std::size_t) const noexcept
|
||||
{ return max_read_size_;}
|
||||
|
||||
void
|
||||
operator()(
|
||||
std::size_t,
|
||||
@@ -137,8 +116,8 @@ struct response_traits<void> {
|
||||
using response_type = void;
|
||||
using adapter_type = detail::ignore_adapter;
|
||||
|
||||
static auto adapt(std::size_t max_read_size) noexcept
|
||||
{ return detail::ignore_adapter{max_read_size}; }
|
||||
static auto adapt() noexcept
|
||||
{ return detail::ignore_adapter{}; }
|
||||
};
|
||||
|
||||
template <class String, class Allocator>
|
||||
@@ -146,8 +125,8 @@ struct response_traits<std::vector<resp3::node<String>, Allocator>> {
|
||||
using response_type = std::vector<resp3::node<String>, Allocator>;
|
||||
using adapter_type = vector_adapter<response_type>;
|
||||
|
||||
static auto adapt(response_type& v, std::size_t max_read_size) noexcept
|
||||
{ return adapter_type{v, max_read_size}; }
|
||||
static auto adapt(response_type& v) noexcept
|
||||
{ return adapter_type{v}; }
|
||||
};
|
||||
|
||||
template <class ...Ts>
|
||||
@@ -155,8 +134,8 @@ struct response_traits<std::tuple<Ts...>> {
|
||||
using response_type = std::tuple<Ts...>;
|
||||
using adapter_type = static_adapter<response_type>;
|
||||
|
||||
static auto adapt(response_type& r, std::size_t max_read_size) noexcept
|
||||
{ return adapter_type{r, max_read_size}; }
|
||||
static auto adapt(response_type& r) noexcept
|
||||
{ return adapter_type{r}; }
|
||||
};
|
||||
|
||||
template <class Adapter>
|
||||
@@ -171,10 +150,6 @@ public:
|
||||
auto get_supported_response_size() const noexcept
|
||||
{ return adapter_.get_supported_response_size();}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_max_read_size(std::size_t) const noexcept
|
||||
{ return adapter_.get_max_read_size(0); }
|
||||
|
||||
private:
|
||||
Adapter adapter_;
|
||||
};
|
||||
@@ -192,13 +167,10 @@ auto make_adapter_wrapper(Adapter adapter)
|
||||
*
|
||||
* This function can be used to create adapters that ignores
|
||||
* responses.
|
||||
*
|
||||
* @param max_read_size Specifies the maximum size of the read
|
||||
* buffer.
|
||||
*/
|
||||
inline auto adapt(std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)()) noexcept
|
||||
inline auto adapt() noexcept
|
||||
{
|
||||
return detail::response_traits<void>::adapt(max_read_size);
|
||||
return detail::response_traits<void>::adapt();
|
||||
}
|
||||
|
||||
/** @brief Adapts a type to be used as a response.
|
||||
@@ -213,13 +185,11 @@ inline auto adapt(std::size_t max_read_size = (std::numeric_limits<std::size_t>:
|
||||
* and `std::string`.
|
||||
*
|
||||
* @param t Tuple containing the responses.
|
||||
* @param max_read_size Specifies the maximum size of the read
|
||||
* buffer.
|
||||
*/
|
||||
template<class T>
|
||||
auto adapt(T& t, std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)()) noexcept
|
||||
auto adapt(T& t) noexcept
|
||||
{
|
||||
return detail::response_traits<T>::adapt(t, max_read_size);
|
||||
return detail::response_traits<T>::adapt(t);
|
||||
}
|
||||
|
||||
} // aedis
|
||||
|
||||
@@ -187,15 +187,21 @@ public:
|
||||
auto cancel(operation op) -> std::size_t
|
||||
{ return base_type::cancel(op); }
|
||||
|
||||
/// Sets the maximum size of the read buffer.
|
||||
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
|
||||
{ base_type::set_max_buffer_read_size(max_read_size); }
|
||||
|
||||
private:
|
||||
using this_type = basic_connection<next_layer_type>;
|
||||
|
||||
template <class, class> friend class detail::connection_base;
|
||||
template <class, class> friend struct detail::exec_read_op;
|
||||
template <class, class> friend struct detail::exec_op;
|
||||
template <class, class> friend struct detail::receive_op;
|
||||
template <class> friend struct detail::reader_op;
|
||||
template <class> friend struct detail::writer_op;
|
||||
template <class> friend struct detail::run_op;
|
||||
template <class> friend struct detail::wait_receive_op;
|
||||
|
||||
void close() { stream_.close(); }
|
||||
auto is_open() const noexcept { return stream_.is_open(); }
|
||||
|
||||
@@ -47,7 +47,7 @@ public:
|
||||
connection_base(executor_type ex, std::pmr::memory_resource* resource)
|
||||
: writer_timer_{ex}
|
||||
, read_timer_{ex}
|
||||
, guarded_op_{ex}
|
||||
, channel_{ex}
|
||||
, read_buffer_{resource}
|
||||
, write_buffer_{resource}
|
||||
, reqs_{resource}
|
||||
@@ -76,7 +76,7 @@ public:
|
||||
}
|
||||
case operation::receive:
|
||||
{
|
||||
guarded_op_.cancel();
|
||||
channel_.cancel();
|
||||
return 1U;
|
||||
}
|
||||
default: BOOST_ASSERT(false); return 0;
|
||||
@@ -136,7 +136,7 @@ public:
|
||||
template <class Adapter, class CompletionToken>
|
||||
auto async_exec(resp3::request const& req, Adapter adapter, CompletionToken token)
|
||||
{
|
||||
BOOST_ASSERT_MSG(req.size() <= adapter.get_supported_response_size(), "Request and adapter have incompatible sizes.");
|
||||
BOOST_ASSERT_MSG(req.size() <= adapter.get_supported_response_size(), "Request and response have incompatible sizes.");
|
||||
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
@@ -149,9 +149,10 @@ public:
|
||||
{
|
||||
auto f = detail::make_adapter_wrapper(adapter);
|
||||
|
||||
return guarded_op_.async_wait(
|
||||
resp3::async_read(derived().next_layer(), make_dynamic_buffer(adapter.get_max_read_size(0)), f, boost::asio::deferred),
|
||||
std::move(token));
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code, std::size_t)
|
||||
>(detail::receive_op<Derived, decltype(f)>{&derived(), f}, token, channel_);
|
||||
}
|
||||
|
||||
template <class CompletionToken>
|
||||
@@ -163,10 +164,14 @@ public:
|
||||
>(detail::run_op<Derived>{&derived()}, token, writer_timer_);
|
||||
}
|
||||
|
||||
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
|
||||
{max_read_size_ = max_read_size;}
|
||||
|
||||
private:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using clock_traits_type = boost::asio::wait_traits<clock_type>;
|
||||
using timer_type = boost::asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
|
||||
using channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
|
||||
|
||||
auto derived() -> Derived& { return static_cast<Derived&>(*this); }
|
||||
|
||||
@@ -272,7 +277,17 @@ private:
|
||||
template <class> friend struct detail::run_op;
|
||||
template <class, class> friend struct detail::exec_op;
|
||||
template <class, class> friend struct detail::exec_read_op;
|
||||
template <class> friend struct detail::send_receive_op;
|
||||
template <class, class> friend struct detail::receive_op;
|
||||
template <class> friend struct detail::wait_receive_op;
|
||||
|
||||
template <class CompletionToken>
|
||||
auto async_wait_receive(CompletionToken token)
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(wait_receive_op<Derived>{&derived()}, token, channel_);
|
||||
}
|
||||
|
||||
void cancel_push_requests()
|
||||
{
|
||||
@@ -299,12 +314,12 @@ private:
|
||||
std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
|
||||
}
|
||||
|
||||
if (derived().is_open() && cmds_ == 0 && write_buffer_.empty())
|
||||
if (derived().is_open() && !is_waiting_response() && write_buffer_.empty())
|
||||
writer_timer_.cancel();
|
||||
}
|
||||
|
||||
auto make_dynamic_buffer(std::size_t max_read_size = 512)
|
||||
{ return boost::asio::dynamic_buffer(read_buffer_, max_read_size); }
|
||||
auto make_dynamic_buffer()
|
||||
{ return boost::asio::dynamic_buffer(read_buffer_, max_read_size_); }
|
||||
|
||||
template <class CompletionToken>
|
||||
auto reader(CompletionToken&& token)
|
||||
@@ -336,7 +351,6 @@ private:
|
||||
void stage_request(req_info& ri)
|
||||
{
|
||||
write_buffer_ += ri.get_request().payload();
|
||||
cmds_ += ri.get_request().size();
|
||||
ri.mark_staged();
|
||||
}
|
||||
|
||||
@@ -358,17 +372,22 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
bool is_waiting_response() const noexcept
|
||||
{
|
||||
return !std::empty(reqs_) && reqs_.front()->is_written();
|
||||
}
|
||||
|
||||
// Notice we use a timer to simulate a condition-variable. It is
|
||||
// also more suitable than a channel and the notify operation does
|
||||
// not suspend.
|
||||
timer_type writer_timer_;
|
||||
timer_type read_timer_;
|
||||
detail::guarded_operation<executor_type> guarded_op_;
|
||||
channel_type channel_;
|
||||
|
||||
std::pmr::string read_buffer_;
|
||||
std::pmr::string write_buffer_;
|
||||
std::size_t cmds_ = 0;
|
||||
reqs_type reqs_;
|
||||
std::size_t max_read_size_ = (std::numeric_limits<std::size_t>::max)();
|
||||
};
|
||||
|
||||
} // aedis
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
|
||||
#include <aedis/adapt.hpp>
|
||||
#include <aedis/error.hpp>
|
||||
#include <aedis/detail/guarded_operation.hpp>
|
||||
#include <aedis/resp3/type.hpp>
|
||||
#include <aedis/resp3/detail/parser.hpp>
|
||||
#include <aedis/resp3/read.hpp>
|
||||
@@ -28,6 +27,30 @@
|
||||
|
||||
namespace aedis::detail {
|
||||
|
||||
template <class Conn>
|
||||
struct wait_receive_op {
|
||||
Conn* conn;
|
||||
boost::asio::coroutine coro{};
|
||||
|
||||
template <class Self>
|
||||
void
|
||||
operator()(Self& self , boost::system::error_code ec = {})
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->channel_.async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
AEDIS_CHECK_OP0(;);
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->channel_.async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
AEDIS_CHECK_OP0(;);
|
||||
|
||||
self.complete({});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class Conn, class Adapter>
|
||||
struct exec_read_op {
|
||||
Conn* conn;
|
||||
@@ -48,7 +71,7 @@ struct exec_read_op {
|
||||
// Loop reading the responses to this request.
|
||||
BOOST_ASSERT(!conn->reqs_.empty());
|
||||
while (cmds != 0) {
|
||||
BOOST_ASSERT(conn->cmds_ != 0);
|
||||
BOOST_ASSERT(conn->is_waiting_response());
|
||||
|
||||
//-----------------------------------
|
||||
// If we detect a push in the middle of a request we have
|
||||
@@ -67,7 +90,7 @@ struct exec_read_op {
|
||||
// the receive_op wait for it to be done and continue.
|
||||
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->guarded_op_.async_run(std::move(self));
|
||||
conn->async_wait_receive(std::move(self));
|
||||
AEDIS_CHECK_OP1(conn->cancel(operation::run););
|
||||
continue;
|
||||
}
|
||||
@@ -76,7 +99,7 @@ struct exec_read_op {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
resp3::async_read(
|
||||
conn->next_layer(),
|
||||
conn->make_dynamic_buffer(adapter.get_max_read_size(index)),
|
||||
conn->make_dynamic_buffer(),
|
||||
[i = index, adpt = adapter] (resp3::node<std::string_view> const& nd, boost::system::error_code& ec) mutable { adpt(i, nd, ec); },
|
||||
std::move(self));
|
||||
|
||||
@@ -88,9 +111,6 @@ struct exec_read_op {
|
||||
|
||||
BOOST_ASSERT(cmds != 0);
|
||||
--cmds;
|
||||
|
||||
BOOST_ASSERT(conn->cmds_ != 0);
|
||||
--conn->cmds_;
|
||||
}
|
||||
|
||||
self.complete({}, read_size);
|
||||
@@ -98,6 +118,46 @@ struct exec_read_op {
|
||||
}
|
||||
};
|
||||
|
||||
template <class Conn, class Adapter>
|
||||
struct receive_op {
|
||||
Conn* conn;
|
||||
Adapter adapter;
|
||||
std::size_t read_size = 0;
|
||||
boost::asio::coroutine coro{};
|
||||
|
||||
template <class Self>
|
||||
void
|
||||
operator()( Self& self
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->channel_.async_receive(std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
resp3::async_read(conn->next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
|
||||
if (ec || is_cancelled(self)) {
|
||||
conn->cancel(operation::run);
|
||||
conn->cancel(operation::receive);
|
||||
self.complete(!!ec ? ec : boost::asio::error::operation_aborted, {});
|
||||
return;
|
||||
}
|
||||
|
||||
read_size = n;
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->channel_.async_receive(std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
|
||||
self.complete({}, read_size);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class Conn, class Adapter>
|
||||
struct exec_op {
|
||||
using req_info_type = typename Conn::req_info;
|
||||
@@ -169,7 +229,6 @@ EXEC_OP_WAIT:
|
||||
|
||||
BOOST_ASSERT(!conn->reqs_.empty());
|
||||
BOOST_ASSERT(conn->reqs_.front() != nullptr);
|
||||
BOOST_ASSERT(conn->cmds_ != 0);
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->async_exec_read(adapter, conn->reqs_.front()->get_number_of_commands(), std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
@@ -179,7 +238,7 @@ EXEC_OP_WAIT:
|
||||
BOOST_ASSERT(!conn->reqs_.empty());
|
||||
conn->reqs_.pop_front();
|
||||
|
||||
if (conn->cmds_ == 0) {
|
||||
if (!conn->is_waiting_response()) {
|
||||
conn->read_timer_.cancel_one();
|
||||
if (!conn->reqs_.empty())
|
||||
conn->writer_timer_.cancel_one();
|
||||
@@ -207,7 +266,7 @@ struct run_op {
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
conn->write_buffer_.clear();
|
||||
conn->cmds_ = 0;
|
||||
conn->read_buffer_.clear();
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::experimental::make_parallel_group(
|
||||
@@ -245,7 +304,7 @@ struct writer_op {
|
||||
|
||||
BOOST_ASIO_CORO_REENTER (coro) for (;;)
|
||||
{
|
||||
while (!conn->reqs_.empty() && conn->cmds_ == 0 && conn->write_buffer_.empty()) {
|
||||
while (!conn->reqs_.empty() && !conn->is_waiting_response() && conn->write_buffer_.empty()) {
|
||||
conn->coalesce_requests();
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_write(conn->next_layer(), boost::asio::buffer(conn->write_buffer_), std::move(self));
|
||||
@@ -324,9 +383,9 @@ struct reader_op {
|
||||
|| conn->reqs_.empty()
|
||||
|| (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)) {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->guarded_op_.async_run(std::move(self));
|
||||
conn->async_wait_receive(std::move(self));
|
||||
} else {
|
||||
BOOST_ASSERT(conn->cmds_ != 0);
|
||||
BOOST_ASSERT(conn->is_waiting_response());
|
||||
BOOST_ASSERT(!conn->reqs_.empty());
|
||||
BOOST_ASSERT(conn->reqs_.front()->get_number_of_commands() != 0);
|
||||
conn->reqs_.front()->proceed();
|
||||
|
||||
@@ -1,111 +0,0 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#ifndef AEDIS_DETAIL_GUARDED_OPERATION_HPP
|
||||
#define AEDIS_DETAIL_GUARDED_OPERATION_HPP
|
||||
|
||||
#include <boost/asio/experimental/channel.hpp>
|
||||
|
||||
namespace aedis::detail {
|
||||
|
||||
template <class Executor>
|
||||
struct send_receive_op {
|
||||
using channel_type = boost::asio::experimental::channel<Executor, void(boost::system::error_code, std::size_t)>;
|
||||
channel_type* channel;
|
||||
boost::asio::coroutine coro{};
|
||||
|
||||
template <class Self>
|
||||
void operator()(Self& self, boost::system::error_code ec = {})
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
AEDIS_CHECK_OP0(;);
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
AEDIS_CHECK_OP0(;);
|
||||
|
||||
self.complete({});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class Executor, class Op>
|
||||
struct wait_op {
|
||||
using channel_type = boost::asio::experimental::channel<Executor, void(boost::system::error_code, std::size_t)>;
|
||||
channel_type* channel;
|
||||
Op op;
|
||||
std::size_t res = 0;
|
||||
boost::asio::coroutine coro{};
|
||||
|
||||
template <class Self>
|
||||
void
|
||||
operator()( Self& self
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_receive(std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
std::move(op)(std::move(self));
|
||||
AEDIS_CHECK_OP1(channel->cancel(););
|
||||
|
||||
res = n;
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_receive(std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
|
||||
self.complete({}, res);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class Executor>
|
||||
class guarded_operation {
|
||||
public:
|
||||
using executor_type = Executor;
|
||||
guarded_operation(executor_type ex) : channel_{ex} {}
|
||||
|
||||
template <class CompletionToken>
|
||||
auto async_run(CompletionToken&& token)
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(send_receive_op<executor_type>{&channel_}, token, channel_);
|
||||
}
|
||||
|
||||
template <class Op, class CompletionToken>
|
||||
auto async_wait(Op&& op, CompletionToken token)
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code, std::size_t)
|
||||
>(wait_op<executor_type, Op>{&channel_, std::move(op)}, token, channel_);
|
||||
}
|
||||
|
||||
void cancel() {channel_.cancel();}
|
||||
|
||||
private:
|
||||
using channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
|
||||
|
||||
template <class> friend struct send_receive_op;
|
||||
template <class, class> friend struct wait_op;
|
||||
|
||||
channel_type channel_;
|
||||
};
|
||||
|
||||
} // aedis::detail
|
||||
|
||||
#endif // AEDIS_DETAIL_GUARDED_OPERATION_HPP
|
||||
@@ -134,15 +134,21 @@ public:
|
||||
|
||||
auto& lowest_layer() noexcept { return stream_.lowest_layer(); }
|
||||
|
||||
/// Sets the maximum size of the read buffer.
|
||||
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
|
||||
{ base_type::set_max_buffer_read_size(max_read_size); }
|
||||
|
||||
private:
|
||||
using this_type = basic_connection<next_layer_type>;
|
||||
|
||||
template <class, class> friend class aedis::detail::connection_base;
|
||||
template <class, class> friend struct aedis::detail::exec_op;
|
||||
template <class, class> friend struct aedis::detail::exec_read_op;
|
||||
template <class, class> friend struct detail::receive_op;
|
||||
template <class> friend struct aedis::detail::run_op;
|
||||
template <class> friend struct aedis::detail::writer_op;
|
||||
template <class> friend struct aedis::detail::reader_op;
|
||||
template <class> friend struct detail::wait_receive_op;
|
||||
|
||||
auto is_open() const noexcept { return stream_.next_layer().is_open(); }
|
||||
void close() { stream_.next_layer().close(); }
|
||||
|
||||
@@ -108,10 +108,6 @@ struct adapter_error {
|
||||
[[nodiscard]]
|
||||
auto get_supported_response_size() const noexcept
|
||||
{ return static_cast<std::size_t>(-1);}
|
||||
|
||||
[[nodiscard]]
|
||||
auto get_max_read_size(std::size_t) const noexcept
|
||||
{ return static_cast<std::size_t>(-1);}
|
||||
};
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_push_adapter)
|
||||
|
||||
@@ -168,31 +168,6 @@ BOOST_AUTO_TEST_CASE(reset_before_run_completes)
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
using slave_operation = aedis::detail::guarded_operation<net::any_io_executor>;
|
||||
|
||||
auto master(std::shared_ptr<slave_operation> op) -> net::awaitable<void>
|
||||
{
|
||||
co_await op->async_run(net::use_awaitable);
|
||||
}
|
||||
|
||||
auto slave(std::shared_ptr<slave_operation> op) -> net::awaitable<void>
|
||||
{
|
||||
net::steady_timer timer{co_await net::this_coro::executor};
|
||||
timer.expires_after(std::chrono::seconds{1});
|
||||
|
||||
co_await op->async_wait(timer.async_wait(net::deferred), net::use_awaitable);
|
||||
std::cout << "Kabuf" << std::endl;
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(slave_op)
|
||||
{
|
||||
net::io_context ioc;
|
||||
auto op = std::make_shared<slave_operation>(ioc.get_executor());
|
||||
net::co_spawn(ioc, master(op), net::detached);
|
||||
net::co_spawn(ioc, slave(op), net::detached);
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
#else
|
||||
int main(){}
|
||||
#endif
|
||||
|
||||
74
tests/issue_50.cpp
Normal file
74
tests/issue_50.cpp
Normal file
@@ -0,0 +1,74 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
|
||||
#include <aedis.hpp>
|
||||
#include <boost/asio/experimental/awaitable_operators.hpp>
|
||||
|
||||
#include "../examples/common/common.hpp"
|
||||
|
||||
namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
using namespace net::experimental::awaitable_operators;
|
||||
using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
|
||||
using aedis::adapt;
|
||||
|
||||
// Push consumer
|
||||
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
for (;;)
|
||||
co_await conn->async_receive();
|
||||
}
|
||||
|
||||
auto periodic_task(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
net::steady_timer timer{co_await net::this_coro::executor};
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
timer.expires_after(std::chrono::seconds(2));
|
||||
co_await timer.async_wait(net::use_awaitable);
|
||||
|
||||
// Key is not set so it will cause an error since we are passing
|
||||
// an adapter that does not accept null, this will cause an error
|
||||
// that result in the connection being closed.
|
||||
resp3::request req;
|
||||
req.push("GET", "mykey");
|
||||
std::tuple<std::string> response;
|
||||
auto [ec, u] = co_await conn->async_exec(req, adapt(response),
|
||||
net::as_tuple(net::use_awaitable));
|
||||
if (ec) {
|
||||
std::cout << "Error: " << ec << std::endl;
|
||||
} else {
|
||||
std::cout << "Response is: " << std::get<0>(response) << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "Periodic task done!" << std::endl;
|
||||
}
|
||||
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
steady_timer timer{ex};
|
||||
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("SUBSCRIBE", "channel");
|
||||
|
||||
// The loop will reconnect on connection lost. To exit type Ctrl-C twice.
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
co_await connect(conn, host, port);
|
||||
co_await ((conn->async_run() || receiver(conn) || healthy_checker(conn) || periodic_task(conn)) &&
|
||||
conn->async_exec(req));
|
||||
|
||||
conn->reset_stream();
|
||||
timer.expires_after(std::chrono::seconds{1});
|
||||
co_await timer.async_wait();
|
||||
}
|
||||
}
|
||||
|
||||
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
|
||||
Reference in New Issue
Block a user