diff --git a/Makefile.am b/Makefile.am index 50d29e40..80904a22 100644 --- a/Makefile.am +++ b/Makefile.am @@ -30,6 +30,7 @@ if HAVE_CXX20 EXTRA_PROGRAMS += subscriber EXTRA_PROGRAMS += echo_server EXTRA_PROGRAMS += chat_room +EXTRA_PROGRAMS += echo_server_client endif CLEANFILES = @@ -52,6 +53,7 @@ subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp custom_adapter_SOURCES = $(top_srcdir)/examples/custom_adapter.cpp echo_server_SOURCES = $(top_srcdir)/examples/echo_server.cpp chat_room_SOURCES = $(top_srcdir)/examples/chat_room.cpp +echo_server_client_SOURCES = $(top_srcdir)/tools/echo_server_client.cpp endif nobase_include_HEADERS =\ diff --git a/aedis/generic/client.hpp b/aedis/generic/client.hpp index 5c605648..d76076ba 100644 --- a/aedis/generic/client.hpp +++ b/aedis/generic/client.hpp @@ -32,6 +32,8 @@ namespace generic { * This class keeps a connection open to the Redis server where * commands can be sent at any time. For more details, please see the * documentation of each individual function. + * + * https://redis.io/docs/reference/sentinel-clients */ template class client { @@ -51,6 +53,9 @@ public: /// Callback type of resp3 operations. using resp3_handler_type = std::function const&, boost::system::error_code&)>; + /// Type of the last layer + using next_layer_type = AsyncReadWriteStream; + using default_completion_token_type = boost::asio::default_completion_token_t; /** @brief Configuration parameters. @@ -63,19 +68,19 @@ public: std::string port = "6379"; /// Timeout of the \c async_resolve operation. - std::chrono::seconds resolve_timeout = std::chrono::seconds{5}; + std::chrono::milliseconds resolve_timeout = std::chrono::seconds{5}; /// Timeout of the \c async_connect operation. - std::chrono::seconds connect_timeout = std::chrono::seconds{5}; + std::chrono::milliseconds connect_timeout = std::chrono::seconds{5}; /// Timeout of the \c async_read operation. - std::chrono::seconds read_timeout = std::chrono::seconds{5}; + std::chrono::milliseconds read_timeout = std::chrono::seconds{5}; /// Timeout of the \c async_write operation. - std::chrono::seconds write_timeout = std::chrono::seconds{5}; + std::chrono::milliseconds write_timeout = std::chrono::seconds{5}; /// Time after which a connection is considered idle if no data is received. - std::chrono::seconds idle_timeout = std::chrono::seconds{10}; + std::chrono::milliseconds idle_timeout = std::chrono::seconds{10}; /// The maximum size allwed in a read operation. std::size_t max_read_size = (std::numeric_limits::max)(); @@ -119,7 +124,7 @@ public: template void send(Command cmd, Ts const&... args) { - auto const can_write = prepare_next(); + auto const can_write = prepare_next_req(); auto const before = requests_.size(); sr_.push(cmd, args...); @@ -150,7 +155,7 @@ public: if (begin == end) return; - auto const can_write = prepare_next(); + auto const can_write = prepare_next_req(); auto const before = requests_.size(); sr_.push_range2(cmd, key, begin, end); @@ -181,7 +186,7 @@ public: if (begin == end) return; - auto const can_write = prepare_next(); + auto const can_write = prepare_next_req(); auto const before = requests_.size(); sr_.push_range2(cmd, begin, end); @@ -342,6 +347,12 @@ public: on_push_ = [recv](std::size_t n){recv->on_push(n);}; } + void stop() + { + socket_->close(); + wait_write_timer_.expires_at(std::chrono::steady_clock::now()); + } + private: using command_info_type = std::pair; using time_point_type = std::chrono::time_point; @@ -360,15 +371,7 @@ private: template friend struct detail::read_write_check_op; template friend struct detail::wait_for_data_op; - void on_resolve() - { - // If we are coming from a connection that was lost we have to - // reset the socket to a fresh state. - socket_ = - std::make_shared(read_timer_.get_executor()); - } - - void on_connect() + void prepare_state() { // When we are reconnecting we can't simply call send(hello) // as that will add the command to the end of the queue, we need @@ -435,7 +438,7 @@ private: // Prepares the back of the queue to receive further commands. If // true is returned the request in the front of the queue can be // sent to the server. - bool prepare_next() + bool prepare_next_req() { if (info_.empty()) { info_.push_back({}); @@ -604,12 +607,6 @@ private: >(detail::check_idle_op{this}, token, check_idle_timer_); } - void on_reader_exit() - { - socket_->close(); - wait_write_timer_.expires_at(std::chrono::steady_clock::now()); - } - // Stores information about a request. struct info { // Set to true before calling async_write. diff --git a/aedis/generic/detail/client_ops.hpp b/aedis/generic/detail/client_ops.hpp index 62318c94..8f965763 100644 --- a/aedis/generic/detail/client_ops.hpp +++ b/aedis/generic/detail/client_ops.hpp @@ -126,7 +126,7 @@ struct check_idle_op { auto const now = std::chrono::steady_clock::now(); if (cli->last_data_ + cli->cfg_.idle_timeout < now) { - cli->on_reader_exit(); + cli->stop(); self.complete(error::idle_timeout); return; } @@ -228,8 +228,6 @@ struct init_op { self.complete(ec1); return; } - - cli->on_resolve(); } break; case 1: @@ -243,6 +241,11 @@ struct init_op { default: BOOST_ASSERT(false); } + cli->socket_ = + std::make_shared< + typename Client::next_layer_type + >(cli->read_timer_.get_executor()); + // Tries a connection with a timeout. We can use the writer // timer here as there is no ongoing write operation. cli->write_timer_.expires_after(cli->cfg_.connect_timeout); @@ -262,8 +265,6 @@ struct init_op { self.complete(ec1); return; } - - cli->on_connect(); } break; case 1: @@ -346,6 +347,8 @@ struct run_op { return; } + cli->prepare_state(); + yield cli->async_read_write_check(std::move(self)); if (ec) { self.complete(ec); @@ -523,7 +526,7 @@ struct reader_op { if (cli->read_buffer_.empty()) { yield cli->async_wait_for_data(std::move(self)); if (ec) { - cli->on_reader_exit(); + cli->stop(); self.complete(ec); return; } @@ -541,7 +544,7 @@ struct reader_op { yield cli->async_read(std::move(self)); if (ec) { - cli->on_reader_exit(); + cli->stop(); self.complete(ec); return; } diff --git a/examples/echo_server.cpp b/examples/echo_server.cpp index 32f3a3b3..72198006 100644 --- a/examples/echo_server.cpp +++ b/examples/echo_server.cpp @@ -41,8 +41,6 @@ public: void on_read(command cmd, std::size_t n) { - std::cout << "on_read: " << cmd << " " << n << std::endl; - switch (cmd) { case command::ping: if (resp_.front().value != "PONG") { @@ -52,7 +50,7 @@ public: break; case command::incr: - std::cout << "Echos so far: " << resp_.front().value << std::endl; + //std::cout << "Echos so far: " << resp_.front().value << std::endl; break; default: /* Ignore */; @@ -63,7 +61,6 @@ public: void on_write(std::size_t n) { - std::cout << "Number of bytes written: " << n << std::endl; } void on_push(std::size_t n) { } diff --git a/tools/echo_server_client.cpp b/tools/echo_server_client.cpp new file mode 100644 index 00000000..7f4eb872 --- /dev/null +++ b/tools/echo_server_client.cpp @@ -0,0 +1,51 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include + +namespace net = boost::asio; + +using net::ip::tcp; +using tcp_socket = net::use_awaitable_t<>::as_default_on_t; +using timer_type = net::use_awaitable_t<>::as_default_on_t; + +net::awaitable example(std::string msg, int n) +{ + try { + auto ex = co_await net::this_coro::executor; + + tcp::resolver resv{ex}; + auto const res = resv.resolve("127.0.0.1", "55555"); + tcp_socket socket{ex}; + co_await socket.async_connect(*std::begin(res)); + + std::string buffer; + auto dbuffer = net::dynamic_buffer(buffer); + for (int i = 0; i < n; ++i) { + co_await net::async_write(socket, net::buffer(msg)); + co_await net::async_read_until(socket, dbuffer, "\n"); + //std::printf("Nada %s", buffer.data()); + } + } catch (...) { + std::cerr << "Error" << std::endl; + } +} + +int main() +{ + try { + net::io_context ioc; + + int n = 100; + for (int i = 0; i < 500; ++i) + net::co_spawn(ioc, example("Some message\n", n), net::detached); + + ioc.run(); + } catch (std::exception const& e) { + std::cerr << e.what() << std::endl; + } +}