diff --git a/CMakeLists.txt b/CMakeLists.txt index 56e61793..69994349 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -299,6 +299,15 @@ if (MSVC) target_compile_definitions(test_request PRIVATE _WIN32_WINNT=0x0601) endif() +add_executable(test_issue_50 tests/issue_50.cpp) +target_compile_features(test_issue_50 PUBLIC cxx_std_20) +target_link_libraries(test_issue_50 common) +add_test(test_issue_50 test_issue_50) +if (MSVC) + target_compile_options(test_issue_50 PRIVATE /bigobj) + target_compile_definitions(test_issue_50 PRIVATE _WIN32_WINNT=0x0601) +endif() + # Install #======================================================================= diff --git a/README.md b/README.md index 9aca6a94..d15e4b5a 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ connection to read Redis ```cpp auto co_main() -> net::awaitable { - auto conn = std::make_shared(co_await net::this_coro::executor); + connection conn{co_await net::this_coro::executor}; // From examples/common.hpp to avoid vebosity co_await connect(conn, "127.0.0.1", "6379"); @@ -65,7 +65,7 @@ auto co_main() -> net::awaitable std::tuple, ignore> resp; // Executes the request. See below why we are using operator ||. - co_await (conn->async_run() || conn->async_exec(req, adapt(resp))); + co_await (conn.async_run() || conn.async_exec(req, adapt(resp))); // Use the map from std::get<1>(resp) ... } ``` diff --git a/include/aedis/adapt.hpp b/include/aedis/adapt.hpp index 4cd5e5a2..54aa1927 100644 --- a/include/aedis/adapt.hpp +++ b/include/aedis/adapt.hpp @@ -40,8 +40,6 @@ namespace detail class ignore_adapter { public: - explicit ignore_adapter(std::size_t max_read_size) : max_read_size_{max_read_size} {} - void operator()( std::size_t, resp3::node const&, boost::system::error_code&) { } @@ -49,13 +47,6 @@ public: [[nodiscard]] auto get_supported_response_size() const noexcept { return static_cast(-1);} - - [[nodiscard]] - auto get_max_read_size(std::size_t) const noexcept - { return max_read_size_;} - -private: - std::size_t max_read_size_; }; template @@ -67,11 +58,9 @@ private: using adapters_array_type = std::array; adapters_array_type adapters_; - std::size_t max_read_size_; public: - explicit static_adapter(Tuple& r, std::size_t max_read_size) - : max_read_size_{max_read_size} + explicit static_adapter(Tuple& r) { adapter::detail::assigner::assign(adapters_, r); } @@ -80,10 +69,6 @@ public: auto get_supported_response_size() const noexcept { return size;} - [[nodiscard]] - auto get_max_read_size(std::size_t) const noexcept - { return max_read_size_;} - void operator()( std::size_t i, @@ -102,12 +87,10 @@ class vector_adapter { private: using adapter_type = typename adapter::detail::response_traits::adapter_type; adapter_type adapter_; - std::size_t max_read_size_; public: - explicit vector_adapter(Vector& v, std::size_t max_read_size) + explicit vector_adapter(Vector& v) : adapter_{adapter::adapt2(v)} - , max_read_size_{max_read_size} { } [[nodiscard]] @@ -115,10 +98,6 @@ public: get_supported_response_size() const noexcept { return static_cast(-1);} - [[nodiscard]] - auto get_max_read_size(std::size_t) const noexcept - { return max_read_size_;} - void operator()( std::size_t, @@ -137,8 +116,8 @@ struct response_traits { using response_type = void; using adapter_type = detail::ignore_adapter; - static auto adapt(std::size_t max_read_size) noexcept - { return detail::ignore_adapter{max_read_size}; } + static auto adapt() noexcept + { return detail::ignore_adapter{}; } }; template @@ -146,8 +125,8 @@ struct response_traits, Allocator>> { using response_type = std::vector, Allocator>; using adapter_type = vector_adapter; - static auto adapt(response_type& v, std::size_t max_read_size) noexcept - { return adapter_type{v, max_read_size}; } + static auto adapt(response_type& v) noexcept + { return adapter_type{v}; } }; template @@ -155,8 +134,8 @@ struct response_traits> { using response_type = std::tuple; using adapter_type = static_adapter; - static auto adapt(response_type& r, std::size_t max_read_size) noexcept - { return adapter_type{r, max_read_size}; } + static auto adapt(response_type& r) noexcept + { return adapter_type{r}; } }; template @@ -171,10 +150,6 @@ public: auto get_supported_response_size() const noexcept { return adapter_.get_supported_response_size();} - [[nodiscard]] - auto get_max_read_size(std::size_t) const noexcept - { return adapter_.get_max_read_size(0); } - private: Adapter adapter_; }; @@ -192,13 +167,10 @@ auto make_adapter_wrapper(Adapter adapter) * * This function can be used to create adapters that ignores * responses. - * - * @param max_read_size Specifies the maximum size of the read - * buffer. */ -inline auto adapt(std::size_t max_read_size = (std::numeric_limits::max)()) noexcept +inline auto adapt() noexcept { - return detail::response_traits::adapt(max_read_size); + return detail::response_traits::adapt(); } /** @brief Adapts a type to be used as a response. @@ -213,13 +185,11 @@ inline auto adapt(std::size_t max_read_size = (std::numeric_limits: * and `std::string`. * * @param t Tuple containing the responses. - * @param max_read_size Specifies the maximum size of the read - * buffer. */ template -auto adapt(T& t, std::size_t max_read_size = (std::numeric_limits::max)()) noexcept +auto adapt(T& t) noexcept { - return detail::response_traits::adapt(t, max_read_size); + return detail::response_traits::adapt(t); } } // aedis diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index 4d5c8b17..7ad8f610 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -187,15 +187,21 @@ public: auto cancel(operation op) -> std::size_t { return base_type::cancel(op); } + /// Sets the maximum size of the read buffer. + void set_max_buffer_read_size(std::size_t max_read_size) noexcept + { base_type::set_max_buffer_read_size(max_read_size); } + private: using this_type = basic_connection; template friend class detail::connection_base; template friend struct detail::exec_read_op; template friend struct detail::exec_op; + template friend struct detail::receive_op; template friend struct detail::reader_op; template friend struct detail::writer_op; template friend struct detail::run_op; + template friend struct detail::wait_receive_op; void close() { stream_.close(); } auto is_open() const noexcept { return stream_.is_open(); } diff --git a/include/aedis/detail/connection_base.hpp b/include/aedis/detail/connection_base.hpp index bf6c3289..45ffca85 100644 --- a/include/aedis/detail/connection_base.hpp +++ b/include/aedis/detail/connection_base.hpp @@ -47,7 +47,7 @@ public: connection_base(executor_type ex, std::pmr::memory_resource* resource) : writer_timer_{ex} , read_timer_{ex} - , guarded_op_{ex} + , channel_{ex} , read_buffer_{resource} , write_buffer_{resource} , reqs_{resource} @@ -76,7 +76,7 @@ public: } case operation::receive: { - guarded_op_.cancel(); + channel_.cancel(); return 1U; } default: BOOST_ASSERT(false); return 0; @@ -136,7 +136,7 @@ public: template auto async_exec(resp3::request const& req, Adapter adapter, CompletionToken token) { - BOOST_ASSERT_MSG(req.size() <= adapter.get_supported_response_size(), "Request and adapter have incompatible sizes."); + BOOST_ASSERT_MSG(req.size() <= adapter.get_supported_response_size(), "Request and response have incompatible sizes."); return boost::asio::async_compose < CompletionToken @@ -149,9 +149,10 @@ public: { auto f = detail::make_adapter_wrapper(adapter); - return guarded_op_.async_wait( - resp3::async_read(derived().next_layer(), make_dynamic_buffer(adapter.get_max_read_size(0)), f, boost::asio::deferred), - std::move(token)); + return boost::asio::async_compose + < CompletionToken + , void(boost::system::error_code, std::size_t) + >(detail::receive_op{&derived(), f}, token, channel_); } template @@ -163,10 +164,14 @@ public: >(detail::run_op{&derived()}, token, writer_timer_); } + void set_max_buffer_read_size(std::size_t max_read_size) noexcept + {max_read_size_ = max_read_size;} + private: using clock_type = std::chrono::steady_clock; using clock_traits_type = boost::asio::wait_traits; using timer_type = boost::asio::basic_waitable_timer; + using channel_type = boost::asio::experimental::channel; auto derived() -> Derived& { return static_cast(*this); } @@ -272,7 +277,17 @@ private: template friend struct detail::run_op; template friend struct detail::exec_op; template friend struct detail::exec_read_op; - template friend struct detail::send_receive_op; + template friend struct detail::receive_op; + template friend struct detail::wait_receive_op; + + template + auto async_wait_receive(CompletionToken token) + { + return boost::asio::async_compose + < CompletionToken + , void(boost::system::error_code) + >(wait_receive_op{&derived()}, token, channel_); + } void cancel_push_requests() { @@ -299,12 +314,12 @@ private: std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); } - if (derived().is_open() && cmds_ == 0 && write_buffer_.empty()) + if (derived().is_open() && !is_waiting_response() && write_buffer_.empty()) writer_timer_.cancel(); } - auto make_dynamic_buffer(std::size_t max_read_size = 512) - { return boost::asio::dynamic_buffer(read_buffer_, max_read_size); } + auto make_dynamic_buffer() + { return boost::asio::dynamic_buffer(read_buffer_, max_read_size_); } template auto reader(CompletionToken&& token) @@ -336,7 +351,6 @@ private: void stage_request(req_info& ri) { write_buffer_ += ri.get_request().payload(); - cmds_ += ri.get_request().size(); ri.mark_staged(); } @@ -358,17 +372,22 @@ private: } } + bool is_waiting_response() const noexcept + { + return !std::empty(reqs_) && reqs_.front()->is_written(); + } + // Notice we use a timer to simulate a condition-variable. It is // also more suitable than a channel and the notify operation does // not suspend. timer_type writer_timer_; timer_type read_timer_; - detail::guarded_operation guarded_op_; + channel_type channel_; std::pmr::string read_buffer_; std::pmr::string write_buffer_; - std::size_t cmds_ = 0; reqs_type reqs_; + std::size_t max_read_size_ = (std::numeric_limits::max)(); }; } // aedis diff --git a/include/aedis/detail/connection_ops.hpp b/include/aedis/detail/connection_ops.hpp index b2844b04..5237833c 100644 --- a/include/aedis/detail/connection_ops.hpp +++ b/include/aedis/detail/connection_ops.hpp @@ -9,7 +9,6 @@ #include #include -#include #include #include #include @@ -28,6 +27,30 @@ namespace aedis::detail { +template +struct wait_receive_op { + Conn* conn; + boost::asio::coroutine coro{}; + + template + void + operator()(Self& self , boost::system::error_code ec = {}) + { + BOOST_ASIO_CORO_REENTER (coro) + { + BOOST_ASIO_CORO_YIELD + conn->channel_.async_send(boost::system::error_code{}, 0, std::move(self)); + AEDIS_CHECK_OP0(;); + + BOOST_ASIO_CORO_YIELD + conn->channel_.async_send(boost::system::error_code{}, 0, std::move(self)); + AEDIS_CHECK_OP0(;); + + self.complete({}); + } + } +}; + template struct exec_read_op { Conn* conn; @@ -48,7 +71,7 @@ struct exec_read_op { // Loop reading the responses to this request. BOOST_ASSERT(!conn->reqs_.empty()); while (cmds != 0) { - BOOST_ASSERT(conn->cmds_ != 0); + BOOST_ASSERT(conn->is_waiting_response()); //----------------------------------- // If we detect a push in the middle of a request we have @@ -67,7 +90,7 @@ struct exec_read_op { // the receive_op wait for it to be done and continue. if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) { BOOST_ASIO_CORO_YIELD - conn->guarded_op_.async_run(std::move(self)); + conn->async_wait_receive(std::move(self)); AEDIS_CHECK_OP1(conn->cancel(operation::run);); continue; } @@ -76,7 +99,7 @@ struct exec_read_op { BOOST_ASIO_CORO_YIELD resp3::async_read( conn->next_layer(), - conn->make_dynamic_buffer(adapter.get_max_read_size(index)), + conn->make_dynamic_buffer(), [i = index, adpt = adapter] (resp3::node const& nd, boost::system::error_code& ec) mutable { adpt(i, nd, ec); }, std::move(self)); @@ -88,9 +111,6 @@ struct exec_read_op { BOOST_ASSERT(cmds != 0); --cmds; - - BOOST_ASSERT(conn->cmds_ != 0); - --conn->cmds_; } self.complete({}, read_size); @@ -98,6 +118,46 @@ struct exec_read_op { } }; +template +struct receive_op { + Conn* conn; + Adapter adapter; + std::size_t read_size = 0; + boost::asio::coroutine coro{}; + + template + void + operator()( Self& self + , boost::system::error_code ec = {} + , std::size_t n = 0) + { + BOOST_ASIO_CORO_REENTER (coro) + { + BOOST_ASIO_CORO_YIELD + conn->channel_.async_receive(std::move(self)); + AEDIS_CHECK_OP1(;); + + BOOST_ASIO_CORO_YIELD + resp3::async_read(conn->next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self)); + if (ec || is_cancelled(self)) { + conn->cancel(operation::run); + conn->cancel(operation::receive); + self.complete(!!ec ? ec : boost::asio::error::operation_aborted, {}); + return; + } + + read_size = n; + + BOOST_ASIO_CORO_YIELD + conn->channel_.async_receive(std::move(self)); + AEDIS_CHECK_OP1(;); + + self.complete({}, read_size); + return; + } + } +}; + template struct exec_op { using req_info_type = typename Conn::req_info; @@ -169,7 +229,6 @@ EXEC_OP_WAIT: BOOST_ASSERT(!conn->reqs_.empty()); BOOST_ASSERT(conn->reqs_.front() != nullptr); - BOOST_ASSERT(conn->cmds_ != 0); BOOST_ASIO_CORO_YIELD conn->async_exec_read(adapter, conn->reqs_.front()->get_number_of_commands(), std::move(self)); AEDIS_CHECK_OP1(;); @@ -179,7 +238,7 @@ EXEC_OP_WAIT: BOOST_ASSERT(!conn->reqs_.empty()); conn->reqs_.pop_front(); - if (conn->cmds_ == 0) { + if (!conn->is_waiting_response()) { conn->read_timer_.cancel_one(); if (!conn->reqs_.empty()) conn->writer_timer_.cancel_one(); @@ -207,7 +266,7 @@ struct run_op { BOOST_ASIO_CORO_REENTER (coro) { conn->write_buffer_.clear(); - conn->cmds_ = 0; + conn->read_buffer_.clear(); BOOST_ASIO_CORO_YIELD boost::asio::experimental::make_parallel_group( @@ -245,7 +304,7 @@ struct writer_op { BOOST_ASIO_CORO_REENTER (coro) for (;;) { - while (!conn->reqs_.empty() && conn->cmds_ == 0 && conn->write_buffer_.empty()) { + while (!conn->reqs_.empty() && !conn->is_waiting_response() && conn->write_buffer_.empty()) { conn->coalesce_requests(); BOOST_ASIO_CORO_YIELD boost::asio::async_write(conn->next_layer(), boost::asio::buffer(conn->write_buffer_), std::move(self)); @@ -324,9 +383,9 @@ struct reader_op { || conn->reqs_.empty() || (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)) { BOOST_ASIO_CORO_YIELD - conn->guarded_op_.async_run(std::move(self)); + conn->async_wait_receive(std::move(self)); } else { - BOOST_ASSERT(conn->cmds_ != 0); + BOOST_ASSERT(conn->is_waiting_response()); BOOST_ASSERT(!conn->reqs_.empty()); BOOST_ASSERT(conn->reqs_.front()->get_number_of_commands() != 0); conn->reqs_.front()->proceed(); diff --git a/include/aedis/detail/guarded_operation.hpp b/include/aedis/detail/guarded_operation.hpp deleted file mode 100644 index 17d29e37..00000000 --- a/include/aedis/detail/guarded_operation.hpp +++ /dev/null @@ -1,111 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#ifndef AEDIS_DETAIL_GUARDED_OPERATION_HPP -#define AEDIS_DETAIL_GUARDED_OPERATION_HPP - -#include - -namespace aedis::detail { - -template -struct send_receive_op { - using channel_type = boost::asio::experimental::channel; - channel_type* channel; - boost::asio::coroutine coro{}; - - template - void operator()(Self& self, boost::system::error_code ec = {}) - { - BOOST_ASIO_CORO_REENTER (coro) - { - BOOST_ASIO_CORO_YIELD - channel->async_send(boost::system::error_code{}, 0, std::move(self)); - AEDIS_CHECK_OP0(;); - - BOOST_ASIO_CORO_YIELD - channel->async_send(boost::system::error_code{}, 0, std::move(self)); - AEDIS_CHECK_OP0(;); - - self.complete({}); - } - } -}; - -template -struct wait_op { - using channel_type = boost::asio::experimental::channel; - channel_type* channel; - Op op; - std::size_t res = 0; - boost::asio::coroutine coro{}; - - template - void - operator()( Self& self - , boost::system::error_code ec = {} - , std::size_t n = 0) - { - BOOST_ASIO_CORO_REENTER (coro) - { - BOOST_ASIO_CORO_YIELD - channel->async_receive(std::move(self)); - AEDIS_CHECK_OP1(;); - - BOOST_ASIO_CORO_YIELD - std::move(op)(std::move(self)); - AEDIS_CHECK_OP1(channel->cancel();); - - res = n; - - BOOST_ASIO_CORO_YIELD - channel->async_receive(std::move(self)); - AEDIS_CHECK_OP1(;); - - self.complete({}, res); - return; - } - } -}; - -template -class guarded_operation { -public: - using executor_type = Executor; - guarded_operation(executor_type ex) : channel_{ex} {} - - template - auto async_run(CompletionToken&& token) - { - return boost::asio::async_compose - < CompletionToken - , void(boost::system::error_code) - >(send_receive_op{&channel_}, token, channel_); - } - - template - auto async_wait(Op&& op, CompletionToken token) - { - return boost::asio::async_compose - < CompletionToken - , void(boost::system::error_code, std::size_t) - >(wait_op{&channel_, std::move(op)}, token, channel_); - } - - void cancel() {channel_.cancel();} - -private: - using channel_type = boost::asio::experimental::channel; - - template friend struct send_receive_op; - template friend struct wait_op; - - channel_type channel_; -}; - -} // aedis::detail - -#endif // AEDIS_DETAIL_GUARDED_OPERATION_HPP diff --git a/include/aedis/ssl/connection.hpp b/include/aedis/ssl/connection.hpp index 17e0b4c0..d95e3354 100644 --- a/include/aedis/ssl/connection.hpp +++ b/include/aedis/ssl/connection.hpp @@ -134,15 +134,21 @@ public: auto& lowest_layer() noexcept { return stream_.lowest_layer(); } + /// Sets the maximum size of the read buffer. + void set_max_buffer_read_size(std::size_t max_read_size) noexcept + { base_type::set_max_buffer_read_size(max_read_size); } + private: using this_type = basic_connection; template friend class aedis::detail::connection_base; template friend struct aedis::detail::exec_op; template friend struct aedis::detail::exec_read_op; + template friend struct detail::receive_op; template friend struct aedis::detail::run_op; template friend struct aedis::detail::writer_op; template friend struct aedis::detail::reader_op; + template friend struct detail::wait_receive_op; auto is_open() const noexcept { return stream_.next_layer().is_open(); } void close() { stream_.next_layer().close(); } diff --git a/tests/conn_push.cpp b/tests/conn_push.cpp index b65b55ca..748c3f72 100644 --- a/tests/conn_push.cpp +++ b/tests/conn_push.cpp @@ -108,10 +108,6 @@ struct adapter_error { [[nodiscard]] auto get_supported_response_size() const noexcept { return static_cast(-1);} - - [[nodiscard]] - auto get_max_read_size(std::size_t) const noexcept - { return static_cast(-1);} }; BOOST_AUTO_TEST_CASE(test_push_adapter) diff --git a/tests/conn_run_cancel.cpp b/tests/conn_run_cancel.cpp index 47de0ea1..394c0be3 100644 --- a/tests/conn_run_cancel.cpp +++ b/tests/conn_run_cancel.cpp @@ -168,31 +168,6 @@ BOOST_AUTO_TEST_CASE(reset_before_run_completes) ioc.run(); } -using slave_operation = aedis::detail::guarded_operation; - -auto master(std::shared_ptr op) -> net::awaitable -{ - co_await op->async_run(net::use_awaitable); -} - -auto slave(std::shared_ptr op) -> net::awaitable -{ - net::steady_timer timer{co_await net::this_coro::executor}; - timer.expires_after(std::chrono::seconds{1}); - - co_await op->async_wait(timer.async_wait(net::deferred), net::use_awaitable); - std::cout << "Kabuf" << std::endl; -} - -BOOST_AUTO_TEST_CASE(slave_op) -{ - net::io_context ioc; - auto op = std::make_shared(ioc.get_executor()); - net::co_spawn(ioc, master(op), net::detached); - net::co_spawn(ioc, slave(op), net::detached); - ioc.run(); -} - #else int main(){} #endif diff --git a/tests/issue_50.cpp b/tests/issue_50.cpp new file mode 100644 index 00000000..bb405841 --- /dev/null +++ b/tests/issue_50.cpp @@ -0,0 +1,74 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#if defined(BOOST_ASIO_HAS_CO_AWAIT) +#include +#include + +#include "../examples/common/common.hpp" + +namespace net = boost::asio; +namespace resp3 = aedis::resp3; +using namespace net::experimental::awaitable_operators; +using steady_timer = net::use_awaitable_t<>::as_default_on_t; +using aedis::adapt; + +// Push consumer +auto receiver(std::shared_ptr conn) -> net::awaitable +{ + for (;;) + co_await conn->async_receive(); +} + +auto periodic_task(std::shared_ptr conn) -> net::awaitable +{ + net::steady_timer timer{co_await net::this_coro::executor}; + for (int i = 0; i < 10; ++i) { + timer.expires_after(std::chrono::seconds(2)); + co_await timer.async_wait(net::use_awaitable); + + // Key is not set so it will cause an error since we are passing + // an adapter that does not accept null, this will cause an error + // that result in the connection being closed. + resp3::request req; + req.push("GET", "mykey"); + std::tuple response; + auto [ec, u] = co_await conn->async_exec(req, adapt(response), + net::as_tuple(net::use_awaitable)); + if (ec) { + std::cout << "Error: " << ec << std::endl; + } else { + std::cout << "Response is: " << std::get<0>(response) << std::endl; + } + } + + std::cout << "Periodic task done!" << std::endl; +} + +auto co_main(std::string host, std::string port) -> net::awaitable +{ + auto ex = co_await net::this_coro::executor; + auto conn = std::make_shared(ex); + steady_timer timer{ex}; + + resp3::request req; + req.push("HELLO", 3); + req.push("SUBSCRIBE", "channel"); + + // The loop will reconnect on connection lost. To exit type Ctrl-C twice. + for (int i = 0; i < 10; ++i) { + co_await connect(conn, host, port); + co_await ((conn->async_run() || receiver(conn) || healthy_checker(conn) || periodic_task(conn)) && + conn->async_exec(req)); + + conn->reset_stream(); + timer.expires_after(std::chrono::seconds{1}); + co_await timer.async_wait(); + } +} + +#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)