diff --git a/README.md b/README.md index 355316a1..e6b70566 100644 --- a/README.md +++ b/README.md @@ -686,6 +686,10 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php. performance improvement where one of my benchmark programs passed from 190k/s to 473k/s. +* The connection has a new member `get_usage()` that returns the + connection usage information, such as number of bytes writen, + received etc. + ### v1.4.2 (incorporates changes to conform the boost review and more) * Adds `boost::redis::config::database_index` to make it possible to diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 1721a82b..c1cb7dea 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -292,6 +292,10 @@ public: void set_receive_response(Response& response) { impl_.set_receive_response(response); } + /// Returns connection usage information. + usage get_usage() const noexcept + { return impl_.get_usage(); } + private: using timer_type = asio::basic_waitable_timer< @@ -394,6 +398,10 @@ public: void set_receive_response(Response& response) { impl_.set_receive_response(response); } + /// Returns connection usage information. + usage get_usage() const noexcept + { return impl_.get_usage(); } + private: void async_run_impl( diff --git a/include/boost/redis/detail/connection_base.hpp b/include/boost/redis/detail/connection_base.hpp index ed399a85..78ee229e 100644 --- a/include/boost/redis/detail/connection_base.hpp +++ b/include/boost/redis/detail/connection_base.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -40,7 +41,8 @@ #include #include -namespace boost::redis::detail { +namespace boost::redis::detail +{ template struct exec_op { @@ -48,8 +50,6 @@ struct exec_op { using adapter_type = typename Conn::adapter_type; Conn* conn_ = nullptr; - request const* req_ = nullptr; - adapter_type adapter{}; std::shared_ptr info_ = nullptr; asio::coroutine coro{}; @@ -60,14 +60,12 @@ struct exec_op { { // Check whether the user wants to wait for the connection to // be stablished. - if (req_->get_config().cancel_if_not_connected && !conn_->is_open()) { + if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) { BOOST_ASIO_CORO_YIELD asio::post(std::move(self)); return self.complete(error::not_connected, 0); } - info_ = std::allocate_shared(asio::get_associated_allocator(self), *req_, adapter, conn_->get_executor()); - conn_->add_request_info(info_); EXEC_OP_WAIT: @@ -329,6 +327,10 @@ public: /// Type of the next layer using next_layer_type = asio::ssl::stream>; + using clock_type = std::chrono::steady_clock; + using clock_traits_type = asio::wait_traits; + using timer_type = asio::basic_waitable_timer; + using receiver_adapter_type = std::function const&, system::error_code&)>; using this_type = connection_base; @@ -391,12 +393,14 @@ public: { using namespace boost::redis::adapter; auto f = boost_redis_adapt(resp); - BOOST_ASSERT_MSG(req.size() <= f.get_supported_response_size(), "Request and response have incompatible sizes."); + BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes."); + + auto info = std::make_shared(req, f, get_executor()); return asio::async_compose < CompletionToken , void(system::error_code, std::size_t) - >(redis::detail::exec_op{this, &req, f}, token, writer_timer_); + >(exec_op{this, info}, token, writer_timer_); } template @@ -427,12 +431,12 @@ public: receive_adapter_ = adapter::detail::make_adapter_wrapper(g); } + usage get_usage() const noexcept + { return usage_; } + private: - using clock_type = std::chrono::steady_clock; - using clock_traits_type = asio::wait_traits; - using timer_type = asio::basic_waitable_timer; using receive_channel_type = asio::experimental::channel; - using runner_type = redis::detail::runner; + using runner_type = runner; using adapter_type = std::function const&, system::error_code&)>; auto use_ssl() const noexcept @@ -545,7 +549,7 @@ private: , action_{action::none} , req_{&req} , adapter_{} - , cmds_{std::size(req)} + , expected_responses_{req.get_expected_responses()} , status_{status::none} , ec_{{}} , read_size_{0} @@ -554,7 +558,7 @@ private: adapter_ = [this, adapter](node_type const& nd, system::error_code& ec) { - auto const i = std::size(*req_) - cmds_; + auto const i = req_->get_expected_responses() - expected_responses_; adapter(i, nd, ec); }; } @@ -611,7 +615,7 @@ private: wrapped_adapter_type adapter_; // Contains the number of commands that haven't been read yet. - std::size_t cmds_; + std::size_t expected_responses_; status status_; system::error_code ec_; @@ -625,16 +629,16 @@ private: using reqs_type = std::deque>; - template friend struct redis::detail::reader_op; - template friend struct redis::detail::writer_op; - template friend struct redis::detail::run_op; - template friend struct redis::detail::exec_op; - template friend struct redis::detail::run_all_op; + template friend struct reader_op; + template friend struct writer_op; + template friend struct run_op; + template friend struct exec_op; + template friend struct run_all_op; void cancel_push_requests() { auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !(ptr->is_staged() && ptr->req_->size() == 0); + return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); }); std::for_each(point, std::end(reqs_), [](auto const& ptr) { @@ -671,7 +675,7 @@ private: return asio::async_compose < CompletionToken , void(system::error_code) - >(redis::detail::reader_op{this, l}, token, writer_timer_); + >(reader_op{this, l}, token, writer_timer_); } template @@ -680,7 +684,7 @@ private: return asio::async_compose < CompletionToken , void(system::error_code) - >(redis::detail::writer_op{this, l}, token, writer_timer_); + >(writer_op{this, l}, token, writer_timer_); } template @@ -691,7 +695,7 @@ private: return asio::async_compose < CompletionToken , void(system::error_code) - >(redis::detail::run_op{this, l}, token, writer_timer_); + >(run_op{this, l}, token, writer_timer_); } [[nodiscard]] bool coalesce_requests() @@ -706,8 +710,11 @@ private: // Stage the request. write_buffer_ += ri->req_->payload(); ri->mark_staged(); + usage_.commands_sent += ri->expected_responses_; }); + usage_.bytes_sent += std::size(write_buffer_); + return point != std::cend(reqs_); } @@ -758,13 +765,13 @@ private: return (resp3::to_type(read_buffer_.front()) == resp3::type::push) || reqs_.empty() - || (!reqs_.empty() && reqs_.front()->cmds_ == 0) + || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0) || !is_waiting_response(); // Added to deal with MONITOR. } auto get_suggested_buffer_growth() const noexcept { - return parser_.get_suggested_buffer_growth(1024); + return parser_.get_suggested_buffer_growth(4096); } enum class parse_result { needs_more, push, resp }; @@ -773,6 +780,14 @@ private: parse_ret_type on_finish_parsing(parse_result t) { + if (t == parse_result::push) { + usage_.pushes_received += 1; + usage_.push_bytes_received += parser_.get_consumed(); + } else { + usage_.responses_received += 1; + usage_.response_bytes_received += parser_.get_consumed(); + } + on_push_ = false; dbuf_.consume(parser_.get_consumed()); auto const res = std::make_pair(t, parser_.get_consumed()); @@ -808,7 +823,7 @@ private: BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)"); BOOST_ASSERT(!reqs_.empty()); BOOST_ASSERT(reqs_.front() != nullptr); - BOOST_ASSERT(reqs_.front()->cmds_ != 0); + BOOST_ASSERT(reqs_.front()->expected_responses_ != 0); if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec)) return std::make_pair(parse_result::needs_more, 0); @@ -821,7 +836,7 @@ private: reqs_.front()->read_size_ += parser_.get_consumed(); - if (--reqs_.front()->cmds_ == 0) { + if (--reqs_.front()->expected_responses_ == 0) { // Done with this request. reqs_.front()->proceed(); reqs_.pop_front(); @@ -849,6 +864,8 @@ private: reqs_type reqs_; resp3::parser parser_{}; bool on_push_ = false; + + usage usage_; }; } // boost::redis::detail diff --git a/include/boost/redis/logger.hpp b/include/boost/redis/logger.hpp index 1bdaf480..b7c1e09c 100644 --- a/include/boost/redis/logger.hpp +++ b/include/boost/redis/logger.hpp @@ -30,7 +30,11 @@ public: * @ingroup high-level-api */ enum class level - { /// Emergency + { + /// Disabled + disabled, + + /// Emergency emerg, /// Alert @@ -60,7 +64,7 @@ public: * * @param l Log level. */ - logger(level l = level::info) + logger(level l = level::disabled) : level_{l} {} diff --git a/include/boost/redis/request.hpp b/include/boost/redis/request.hpp index b3508c58..6d3e44b7 100644 --- a/include/boost/redis/request.hpp +++ b/include/boost/redis/request.hpp @@ -84,8 +84,12 @@ public: request(config cfg = config{true, false, true, true}) : cfg_{cfg} {} + //// Returns the number of responses expected for this request. + [[nodiscard]] auto get_expected_responses() const noexcept -> std::size_t + { return expected_responses_;}; + //// Returns the number of commands contained in this request. - [[nodiscard]] auto size() const noexcept -> std::size_t + [[nodiscard]] auto get_commands() const noexcept -> std::size_t { return commands_;}; [[nodiscard]] auto payload() const noexcept -> std::string_view @@ -99,6 +103,7 @@ public: { payload_.clear(); commands_ = 0; + expected_responses_ = 0; has_hello_priority_ = false; } @@ -303,8 +308,10 @@ public: private: void check_cmd(std::string_view cmd) { + ++commands_; + if (!detail::has_response(cmd)) - ++commands_; + ++expected_responses_; if (cmd == "HELLO") has_hello_priority_ = cfg_.hello_with_priority; @@ -313,6 +320,7 @@ private: config cfg_; std::string payload_; std::size_t commands_ = 0; + std::size_t expected_responses_ = 0; bool has_hello_priority_ = false; }; diff --git a/include/boost/redis/usage.hpp b/include/boost/redis/usage.hpp new file mode 100644 index 00000000..91224e96 --- /dev/null +++ b/include/boost/redis/usage.hpp @@ -0,0 +1,43 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_USAGE_HPP +#define BOOST_REDIS_USAGE_HPP + +namespace boost::redis +{ + +/** @brief Connection usage information. + * @ingroup high-level-api + * + * @note: To simplify the implementation, the commands_sent and + * bytes_sent in the struct below are computed just before writing to + * the socket, which means on error they might not represent exaclty + * what has been received by the Redis server. + */ +struct usage { + /// Number of commands sent. + std::size_t commands_sent = 0; + + /// Number of bytes sent. + std::size_t bytes_sent = 0; + + /// Number of responses received. + std::size_t responses_received = 0; + + /// Number of pushes received. + std::size_t pushes_received = 0; + + /// Number of response-bytes received. + std::size_t response_bytes_received = 0; + + /// Number of push-bytes received. + std::size_t push_bytes_received = 0; +}; + +} // boost::redis + +#endif // BOOST_REDIS_USAGE_HPP diff --git a/tests/test_conn_echo_stress.cpp b/tests/test_conn_echo_stress.cpp index 48bfac0b..0cbf3a4c 100644 --- a/tests/test_conn_echo_stress.cpp +++ b/tests/test_conn_echo_stress.cpp @@ -26,6 +26,20 @@ using boost::redis::ignore_t; using boost::redis::logger; using boost::redis::config; using boost::redis::connection; +using boost::redis::usage; + +std::ostream& operator<<(std::ostream& os, usage const& u) +{ + os + << "Commands sent: " << u.commands_sent << "\n" + << "Bytes sent: " << u.bytes_sent << "\n" + << "Responses received: " << u.responses_received << "\n" + << "Pushes received: " << u.pushes_received << "\n" + << "Response bytes received: " << u.response_bytes_received << "\n" + << "Push bytes received: " << u.push_bytes_received; + + return os; +} auto push_consumer(std::shared_ptr conn, int expected) -> net::awaitable { @@ -73,10 +87,9 @@ echo_session( } } -auto async_echo_stress() -> net::awaitable +auto async_echo_stress(std::shared_ptr conn) -> net::awaitable { auto ex = co_await net::this_coro::executor; - auto conn = std::make_shared(ex); config cfg; cfg.health_check_interval = std::chrono::seconds::zero(); run(conn, cfg, @@ -117,8 +130,12 @@ auto async_echo_stress() -> net::awaitable BOOST_AUTO_TEST_CASE(echo_stress) { net::io_context ioc; - net::co_spawn(ioc, async_echo_stress(), net::detached); + auto conn = std::make_shared(ioc); + net::co_spawn(ioc, async_echo_stress(conn), net::detached); ioc.run(); + + std::cout << "-------------------\n" + << conn->get_usage() << std::endl; } #else