From 5328cdff9abbe3b0738d79127e3ec3638efb0185 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Fri, 17 Jun 2022 22:51:51 +0200 Subject: [PATCH] Adds coalesce option. --- aedis/adapter/adapt.hpp | 6 +++-- aedis/connection.hpp | 45 ++++++++++++++++++++------------- aedis/detail/connection_ops.hpp | 10 +------- aedis/resp3/compose.hpp | 10 +++++--- aedis/resp3/read.hpp | 9 ++++--- examples/echo_server.cpp | 10 +++++--- tests/check.hpp | 4 +-- tests/high_level.cpp | 11 ++++---- tools/echo_server_client.cpp | 2 +- 9 files changed, 59 insertions(+), 48 deletions(-) diff --git a/aedis/adapter/adapt.hpp b/aedis/adapter/adapt.hpp index 6279aa09..eeba4322 100644 --- a/aedis/adapter/adapt.hpp +++ b/aedis/adapter/adapt.hpp @@ -15,7 +15,8 @@ namespace adapter { template using adapter_t = typename detail::adapter_t; -/** \brief Creates a dummy response adapter. +/** \internal + \brief Creates a dummy response adapter. \ingroup any The adapter returned by this function ignores responses. It is @@ -41,7 +42,8 @@ inline auto adapt() noexcept { return detail::response_traits::adapt(); } -/** \brief Adapts user data to read operations. +/** \internal + * \brief Adapts user data to read operations. * \ingroup any * * STL containers, \c std::tuple and built-in types are supported and diff --git a/aedis/connection.hpp b/aedis/connection.hpp index 8caec25c..d7c0da4c 100644 --- a/aedis/connection.hpp +++ b/aedis/connection.hpp @@ -64,6 +64,9 @@ public: /// The maximum size allwed in a read operation. std::size_t max_read_size = (std::numeric_limits::max)(); + + // Whether to coalesce requests. + bool coalesce_requests = true; }; /** \brief Constructor. @@ -265,26 +268,24 @@ private: struct req_info { boost::asio::steady_timer timer; + resp3::request const* req = nullptr; std::size_t n_cmds = 0; bool stop = false; bool expects_response() const noexcept { return n_cmds != 0;} - - void pop() noexcept - { - --n_cmds; - } + void pop() noexcept { --n_cmds; } }; bool add_request(resp3::request const& req) { BOOST_ASSERT(!req.payload().empty()); auto const empty = reqs_.empty(); - reqs_.push_back(make_req_info(req.commands().size())); + + reqs_.push_back(make_req_info()); reqs_.back()->timer.expires_at(std::chrono::steady_clock::time_point::max()); - payload_next_ += req.payload(); - for (auto cmd : req.commands()) - cmds_next_.push(cmd.first); + reqs_.back()->req = &req; + reqs_.back()->n_cmds = req.commands().size(); + reqs_.back()->stop = false; return empty && socket_ != nullptr; } @@ -403,16 +404,12 @@ private: >(detail::exec_exit_op{this}, token, resv_); } - std::shared_ptr make_req_info(std::size_t cmds) + std::shared_ptr make_req_info() { if (pool_.empty()) - return std::make_shared( - boost::asio::steady_timer{resv_.get_executor()}, - cmds, false); + return std::make_shared(boost::asio::steady_timer{resv_.get_executor()}); auto ret = pool_.back(); - ret->n_cmds = cmds; - ret->stop = false; pool_.pop_back(); return ret; } @@ -422,6 +419,22 @@ private: pool_.push_back(info); } + void coalesce_requests() + { + // Coaleces all requests: Copies the request to the variables + // that won't be touched while async_write is suspended. + BOOST_ASSERT(payload_.empty()); + BOOST_ASSERT(!reqs_.empty()); + + auto const size = cfg_.coalesce_requests ? reqs_.size() : 1; + for (auto i = 0UL; i < size; ++i) { + payload_ += reqs_.at(i)->req->payload(); + for (auto cmd : reqs_.at(i)->req->commands()) { + cmds_.push(cmd.first); + } + } + } + using channel_type = boost::asio::experimental::channel; // IO objects @@ -440,9 +453,7 @@ private: std::string read_buffer_; std::string payload_; - std::string payload_next_; std::queue cmds_; - std::queue cmds_next_; std::deque> reqs_; std::vector> pool_; diff --git a/aedis/detail/connection_ops.hpp b/aedis/detail/connection_ops.hpp index 4255f342..32e75332 100644 --- a/aedis/detail/connection_ops.hpp +++ b/aedis/detail/connection_ops.hpp @@ -222,15 +222,7 @@ struct exec_write_op { { reenter (coro) { - BOOST_ASSERT(!cli->payload_next_.empty()); - - // Copies the request to the variables that won't be touched - // while async_write is suspended. - std::swap(cli->cmds_next_, cli->cmds_); - std::swap(cli->payload_next_, cli->payload_); - cli->cmds_next_ = {}; - cli->payload_next_.clear(); - + cli->coalesce_requests(); cli->write_timer_.expires_after(cli->cfg_.write_timeout); yield aedis::detail::async_write(*cli->socket_, cli->write_timer_, boost::asio::buffer(cli->payload_), std::move(self)); if (ec) { diff --git a/aedis/resp3/compose.hpp b/aedis/resp3/compose.hpp index e7ad747d..3943ef5a 100644 --- a/aedis/resp3/compose.hpp +++ b/aedis/resp3/compose.hpp @@ -97,8 +97,9 @@ struct add_bulk_impl> { } // detail -/** @brief Adds a resp3 header to the request. - * @ingroup any +/** \internal + * \brief Adds a resp3 header to the request. + * \ingroup any * * See mystruct.hpp for an example. */ @@ -142,8 +143,9 @@ void add_blob(Request& to, boost::string_view blob) to += separator; } -/** @brief Adds a separator to the request. - * @ingroup any +/** \internal + * \brief Adds a separator to the request. + * \ingroup any * * See mystruct.hpp for an example. */ diff --git a/aedis/resp3/read.hpp b/aedis/resp3/read.hpp index 49bfbc0b..f7df8654 100644 --- a/aedis/resp3/read.hpp +++ b/aedis/resp3/read.hpp @@ -18,7 +18,8 @@ namespace aedis { namespace resp3 { -/** \brief Reads a complete response to a command sychronously. +/** \internal + * \brief Reads a complete response to a command sychronously. * \ingroup any * * This function reads a complete response to a command or a @@ -102,7 +103,8 @@ read( return consumed; } -/** \brief Reads a complete response to a command sychronously. +/** \internal + * \brief Reads a complete response to a command sychronously. * \ingroup any * * Same as the error_code overload but throws on error. @@ -126,7 +128,8 @@ read( return n; } -/** @brief Reads a complete response to a Redis command asynchronously. +/** \internal + * \brief Reads a complete response to a Redis command asynchronously. * \ingroup any * * This function reads a complete response to a command or a diff --git a/examples/echo_server.cpp b/examples/echo_server.cpp index 2b3030ea..103ca2a4 100644 --- a/examples/echo_server.cpp +++ b/examples/echo_server.cpp @@ -35,10 +35,12 @@ net::awaitable echo_loop(tcp_socket socket, std::shared_ptr db } } -net::awaitable listener() +net::awaitable listener(bool coalesce_requests) { auto ex = co_await net::this_coro::executor; - auto db = std::make_shared(ex); + connection::config cfg; + cfg.coalesce_requests = coalesce_requests; + auto db = std::make_shared(ex, cfg); db->async_run("127.0.0.1", "6379", net::detached); tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555}); @@ -46,11 +48,11 @@ net::awaitable listener() net::co_spawn(ex, echo_loop(co_await acc.async_accept(), db), net::detached); } -int main() +int main(int argc, char* argv[]) { try { net::io_context ioc; - co_spawn(ioc, listener(), net::detached); + co_spawn(ioc, listener(argc == 1), net::detached); ioc.run(); } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/tests/check.hpp b/tests/check.hpp index 22653983..c684855d 100644 --- a/tests/check.hpp +++ b/tests/check.hpp @@ -44,9 +44,9 @@ void expect_error(boost::system::error_code a, T expected = {}, std::string cons } inline -void expect_no_error(boost::system::error_code ec) +void expect_no_error(boost::system::error_code ec, std::string const& msg) { - expect_error(ec, boost::system::error_code{}); + expect_error(ec, boost::system::error_code{}, msg); } template diff --git a/tests/high_level.cpp b/tests/high_level.cpp index 2fa7afcd..2eca3a95 100644 --- a/tests/high_level.cpp +++ b/tests/high_level.cpp @@ -69,7 +69,7 @@ void test_quit() request req; req.push(command::quit); db->async_exec(req, aedis::adapt(), [](auto ec, auto r){ - expect_no_error(ec); + expect_no_error(ec, "test_quit"); }); db->async_run("127.0.0.1", "6379", [](auto ec){ @@ -100,7 +100,7 @@ push_consumer1(std::shared_ptr db) { { auto [ec, n] = co_await db->async_read_push(aedis::adapt(), as_tuple(net::use_awaitable)); - expect_no_error(ec); + expect_no_error(ec, "push_consumer"); } { @@ -135,7 +135,7 @@ net::awaitable run5(std::shared_ptr db) request req; req.push(command::quit); db->async_exec(req, aedis::adapt(), [](auto ec, auto){ - expect_no_error(ec); + expect_no_error(ec, "run5"); }); auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable)); @@ -146,7 +146,7 @@ net::awaitable run5(std::shared_ptr db) request req; req.push(command::quit); db->async_exec(req, aedis::adapt(), [](auto ec, auto){ - expect_no_error(ec); + expect_no_error(ec, "run5"); }); auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable)); @@ -235,7 +235,7 @@ void test_idle() req.push(command::client, "PAUSE", 5000); db->async_exec(req, aedis::adapt(), [](auto ec, auto r){ - expect_no_error(ec); + expect_no_error(ec, "test_idle"); }); db->async_run("127.0.0.1", "6379", [](auto ec){ @@ -358,7 +358,6 @@ int main() test_no_push_reader1(); test_no_push_reader2(); test_no_push_reader3(); - // TODO: Reconnect is not working. //test_reconnect(); test_exec_while_processing(); diff --git a/tools/echo_server_client.cpp b/tools/echo_server_client.cpp index 133021ae..16eeaf6a 100644 --- a/tools/echo_server_client.cpp +++ b/tools/echo_server_client.cpp @@ -31,7 +31,7 @@ example(boost::asio::ip::tcp::endpoint ep, std::string msg, int n) dbuffer.consume(n); } - std::printf("Ok: %s", msg.data()); + //std::printf("Ok: %s", msg.data()); } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; }