2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Implements async_ping.

This commit is contained in:
Marcelo Zimbres
2022-04-24 17:37:17 +02:00
parent b453fc63c9
commit 0363353154
3 changed files with 89 additions and 20 deletions

View File

@@ -313,6 +313,7 @@ private:
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
template <class T, class V> friend struct detail::reader_op;
template <class T, class V> friend struct detail::ping_op;
template <class T> friend struct detail::read_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::write_op;
@@ -322,6 +323,7 @@ private:
template <class T> friend struct detail::check_idle_op;
template <class T> friend struct detail::init_op;
template <class T> friend struct detail::read_write_check_op;
template <class T> friend struct detail::wait_data_op;
void on_resolve()
{
@@ -517,6 +519,26 @@ private:
>(detail::read_write_check_op<client>{this}, token, read_timer_, write_timer_, wait_write_timer_, check_idle_timer_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_ping_after(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::ping_op<client, Command>{this}, token, read_timer_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_wait_for_data(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::wait_data_op<client>{this}, token, read_timer_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_check_idle(CompletionToken&& token = default_completion_token_type{})

View File

@@ -26,16 +26,68 @@ namespace detail {
#include <boost/asio/yield.hpp>
template <class Client, class Command>
struct ping_op {
Client* cli;
boost::asio::coroutine coro;
template <class Self>
void
operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro) {
// TODO: Prevent timeout zero.
cli->read_timer_.expires_after(cli->cfg_.idle_timeout / 2);
yield cli->read_timer_.async_wait(std::move(self));
if (ec) {
// operation_aborted: ok, not an error.
self.complete({});
return;
}
// The timer fired, send the ping.
cli->send(Command::ping);
self.complete({});
}
}
};
template <class Client>
struct wait_data_op {
Client* cli;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro) {
// Detached.
cli->async_ping_after([](boost::system::error_code ec){});
// Waits for incomming data.
yield boost::asio::async_read_until(*cli->socket_, boost::asio::dynamic_buffer(cli->read_buffer_, cli->cfg_.max_read_size), "\r\n", std::move(self));
// Cancels the async_ping_after.
cli->read_timer_.cancel();
if (ec) {
self.complete(ec);
return;
}
self.complete({});
}
}
};
template <class Client>
struct check_idle_op {
Client* cli;
boost::asio::coroutine coro;
template <class Self>
void
operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::resolver::results_type res = {})
void operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro) for(;;) {
@@ -295,9 +347,13 @@ struct write_op {
assert(!cli->requests_.empty());
cli->write_timer_.expires_after(cli->cfg_.write_timeout);
size = cli->info_.front().size;
// TODO: Add a sent flag to info to avoid using the size to
// check whether a request has already been sent.
cli->info_.front().size = 0;
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return boost::asio::async_write(*cli->socket_, boost::asio::buffer(cli->requests_.data(), cli->info_.front().size), token);},
[this](auto token) { return boost::asio::async_write(*cli->socket_, boost::asio::buffer(cli->requests_.data(), size), token);},
[this](auto token) { return cli->write_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
@@ -323,12 +379,8 @@ struct write_op {
default: assert(false);
}
assert(n == cli->info_.front().size);
size = cli->info_.front().size;
cli->requests_.erase(0, cli->info_.front().size);
cli->info_.front().size = 0;
assert(n == size);
cli->requests_.erase(0, size);
if (cli->info_.front().cmds == 0)
cli->info_.erase(std::begin(cli->info_));
@@ -436,13 +488,7 @@ struct reader_op {
boost::ignore_unused(n);
if (cli->read_buffer_.empty()) {
yield
boost::asio::async_read_until(
*cli->socket_,
boost::asio::dynamic_buffer(cli->read_buffer_, cli->cfg_.max_read_size),
"\r\n",
std::move(self));
yield cli->async_wait_for_data(std::move(self));
if (ec) {
cli->on_reader_exit();
self.complete(ec);

View File

@@ -386,7 +386,7 @@ public:
{
if (!std::exchange(sent_, true)) {
db_->send(command::del, "key");
db_->send(command::blpop, "key");
db_->send(command::client, "PAUSE", 2000);
}
}
@@ -408,7 +408,7 @@ void test_idle()
cfg.connect_timeout = std::chrono::seconds{1};
cfg.read_timeout = std::chrono::seconds{1};
cfg.write_timeout = std::chrono::seconds{1};
cfg.idle_timeout = std::chrono::seconds{1};
cfg.idle_timeout = std::chrono::seconds{2};
client_type db(ioc.get_executor(), cfg);
receiver8 recv{db};
@@ -420,6 +420,7 @@ void test_idle()
int main()
{
// TODO: send client unpause before tests.
test_resolve_error();
test_connect_error();
test_hello();