2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00
* Program to benchmark the high level client.
* First steps with sentinel support in the high level client.
This commit is contained in:
Marcelo Zimbres
2022-05-04 22:54:21 +02:00
parent 4be6e6cc1e
commit d1bf3a91be
5 changed files with 85 additions and 35 deletions

View File

@@ -30,6 +30,7 @@ if HAVE_CXX20
EXTRA_PROGRAMS += subscriber
EXTRA_PROGRAMS += echo_server
EXTRA_PROGRAMS += chat_room
EXTRA_PROGRAMS += echo_server_client
endif
CLEANFILES =
@@ -52,6 +53,7 @@ subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp
custom_adapter_SOURCES = $(top_srcdir)/examples/custom_adapter.cpp
echo_server_SOURCES = $(top_srcdir)/examples/echo_server.cpp
chat_room_SOURCES = $(top_srcdir)/examples/chat_room.cpp
echo_server_client_SOURCES = $(top_srcdir)/tools/echo_server_client.cpp
endif
nobase_include_HEADERS =\

View File

@@ -32,6 +32,8 @@ namespace generic {
* This class keeps a connection open to the Redis server where
* commands can be sent at any time. For more details, please see the
* documentation of each individual function.
*
* https://redis.io/docs/reference/sentinel-clients
*/
template <class AsyncReadWriteStream, class Command>
class client {
@@ -51,6 +53,9 @@ public:
/// Callback type of resp3 operations.
using resp3_handler_type = std::function<void(Command, resp3::node<boost::string_view> const&, boost::system::error_code&)>;
/// Type of the last layer
using next_layer_type = AsyncReadWriteStream;
using default_completion_token_type = boost::asio::default_completion_token_t<executor_type>;
/** @brief Configuration parameters.
@@ -63,19 +68,19 @@ public:
std::string port = "6379";
/// Timeout of the \c async_resolve operation.
std::chrono::seconds resolve_timeout = std::chrono::seconds{5};
std::chrono::milliseconds resolve_timeout = std::chrono::seconds{5};
/// Timeout of the \c async_connect operation.
std::chrono::seconds connect_timeout = std::chrono::seconds{5};
std::chrono::milliseconds connect_timeout = std::chrono::seconds{5};
/// Timeout of the \c async_read operation.
std::chrono::seconds read_timeout = std::chrono::seconds{5};
std::chrono::milliseconds read_timeout = std::chrono::seconds{5};
/// Timeout of the \c async_write operation.
std::chrono::seconds write_timeout = std::chrono::seconds{5};
std::chrono::milliseconds write_timeout = std::chrono::seconds{5};
/// Time after which a connection is considered idle if no data is received.
std::chrono::seconds idle_timeout = std::chrono::seconds{10};
std::chrono::milliseconds idle_timeout = std::chrono::seconds{10};
/// The maximum size allwed in a read operation.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
@@ -119,7 +124,7 @@ public:
template <class... Ts>
void send(Command cmd, Ts const&... args)
{
auto const can_write = prepare_next();
auto const can_write = prepare_next_req();
auto const before = requests_.size();
sr_.push(cmd, args...);
@@ -150,7 +155,7 @@ public:
if (begin == end)
return;
auto const can_write = prepare_next();
auto const can_write = prepare_next_req();
auto const before = requests_.size();
sr_.push_range2(cmd, key, begin, end);
@@ -181,7 +186,7 @@ public:
if (begin == end)
return;
auto const can_write = prepare_next();
auto const can_write = prepare_next_req();
auto const before = requests_.size();
sr_.push_range2(cmd, begin, end);
@@ -342,6 +347,12 @@ public:
on_push_ = [recv](std::size_t n){recv->on_push(n);};
}
void stop()
{
socket_->close();
wait_write_timer_.expires_at(std::chrono::steady_clock::now());
}
private:
using command_info_type = std::pair<Command, std::size_t>;
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
@@ -360,15 +371,7 @@ private:
template <class T> friend struct detail::read_write_check_op;
template <class T> friend struct detail::wait_for_data_op;
void on_resolve()
{
// If we are coming from a connection that was lost we have to
// reset the socket to a fresh state.
socket_ =
std::make_shared<AsyncReadWriteStream>(read_timer_.get_executor());
}
void on_connect()
void prepare_state()
{
// When we are reconnecting we can't simply call send(hello)
// as that will add the command to the end of the queue, we need
@@ -435,7 +438,7 @@ private:
// Prepares the back of the queue to receive further commands. If
// true is returned the request in the front of the queue can be
// sent to the server.
bool prepare_next()
bool prepare_next_req()
{
if (info_.empty()) {
info_.push_back({});
@@ -604,12 +607,6 @@ private:
>(detail::check_idle_op<client>{this}, token, check_idle_timer_);
}
void on_reader_exit()
{
socket_->close();
wait_write_timer_.expires_at(std::chrono::steady_clock::now());
}
// Stores information about a request.
struct info {
// Set to true before calling async_write.

View File

@@ -126,7 +126,7 @@ struct check_idle_op {
auto const now = std::chrono::steady_clock::now();
if (cli->last_data_ + cli->cfg_.idle_timeout < now) {
cli->on_reader_exit();
cli->stop();
self.complete(error::idle_timeout);
return;
}
@@ -228,8 +228,6 @@ struct init_op {
self.complete(ec1);
return;
}
cli->on_resolve();
} break;
case 1:
@@ -243,6 +241,11 @@ struct init_op {
default: BOOST_ASSERT(false);
}
cli->socket_ =
std::make_shared<
typename Client::next_layer_type
>(cli->read_timer_.get_executor());
// Tries a connection with a timeout. We can use the writer
// timer here as there is no ongoing write operation.
cli->write_timer_.expires_after(cli->cfg_.connect_timeout);
@@ -262,8 +265,6 @@ struct init_op {
self.complete(ec1);
return;
}
cli->on_connect();
} break;
case 1:
@@ -346,6 +347,8 @@ struct run_op {
return;
}
cli->prepare_state();
yield cli->async_read_write_check(std::move(self));
if (ec) {
self.complete(ec);
@@ -523,7 +526,7 @@ struct reader_op {
if (cli->read_buffer_.empty()) {
yield cli->async_wait_for_data(std::move(self));
if (ec) {
cli->on_reader_exit();
cli->stop();
self.complete(ec);
return;
}
@@ -541,7 +544,7 @@ struct reader_op {
yield cli->async_read(std::move(self));
if (ec) {
cli->on_reader_exit();
cli->stop();
self.complete(ec);
return;
}

View File

@@ -41,8 +41,6 @@ public:
void on_read(command cmd, std::size_t n)
{
std::cout << "on_read: " << cmd << " " << n << std::endl;
switch (cmd) {
case command::ping:
if (resp_.front().value != "PONG") {
@@ -52,7 +50,7 @@ public:
break;
case command::incr:
std::cout << "Echos so far: " << resp_.front().value << std::endl;
//std::cout << "Echos so far: " << resp_.front().value << std::endl;
break;
default: /* Ignore */;
@@ -63,7 +61,6 @@ public:
void on_write(std::size_t n)
{
std::cout << "Number of bytes written: " << n << std::endl;
}
void on_push(std::size_t n) { }

View File

@@ -0,0 +1,51 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <boost/asio.hpp>
namespace net = boost::asio;
using net::ip::tcp;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using timer_type = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
net::awaitable<void> example(std::string msg, int n)
{
try {
auto ex = co_await net::this_coro::executor;
tcp::resolver resv{ex};
auto const res = resv.resolve("127.0.0.1", "55555");
tcp_socket socket{ex};
co_await socket.async_connect(*std::begin(res));
std::string buffer;
auto dbuffer = net::dynamic_buffer(buffer);
for (int i = 0; i < n; ++i) {
co_await net::async_write(socket, net::buffer(msg));
co_await net::async_read_until(socket, dbuffer, "\n");
//std::printf("Nada %s", buffer.data());
}
} catch (...) {
std::cerr << "Error" << std::endl;
}
}
int main()
{
try {
net::io_context ioc;
int n = 100;
for (int i = 0; i < 500; ++i)
net::co_spawn(ioc, example("Some message\n", n), net::detached);
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}