2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Implements automatic hello.

This commit is contained in:
Marcelo Zimbres
2022-05-24 22:36:16 +02:00
parent 7300f1498b
commit 3dff0b78de
9 changed files with 131 additions and 220 deletions

View File

@@ -102,14 +102,12 @@ public:
, ping_timer_{ex}
, write_timer_{ex}
, wait_write_timer_{ex}
, check_idle_timer_{ex}
, idle_check_timer{ex}
, read_ch_{ex}
, push_ch_{ex}
, cfg_{cfg}
, adapter_{adapter}
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
, type_{resp3::type::invalid}
, cmd_info_{std::make_pair<Command>(Command::invalid, 0)}
{
}
@@ -217,7 +215,7 @@ public:
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t, std::size_t)
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<client>{this, &req}, token, read_timer_);
}
@@ -269,17 +267,15 @@ private:
template <class T, class V> friend struct detail::reader_op;
template <class T, class V> friend struct detail::ping_op;
template <class T> friend struct detail::read_with_timeout_op;
template <class T> friend struct detail::read_op;
template <class T, class V> friend struct detail::read_with_timeout_op;
template <class T, class V> friend struct detail::run_op;
template <class T, class V> friend struct detail::exec_op2;
template <class T> friend struct detail::write_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::write_with_timeout_op;
template <class T> friend struct detail::connect_op;
template <class T> friend struct detail::connect_with_timeout_op;
template <class T> friend struct detail::resolve_op;
template <class T> friend struct detail::resolve_with_timeout_op;
template <class T> friend struct detail::check_idle_op;
template <class T> friend struct detail::idle_check_op;
template <class T> friend struct detail::read_write_check_ping_op;
template <class T> friend struct detail::exec_op;
@@ -288,13 +284,13 @@ private:
return boost::asio::dynamic_buffer(read_buffer_, cfg_.max_read_size);
}
auto wrap_adapter()
auto select_adapter(Command cmd)
{
return [this]
return [this, cmd]
(resp3::node<boost::string_view> const& nd,
boost::system::error_code& ec) mutable
{
adapter_(cmd_info_.first, nd, ec);
adapter_(cmd, nd, ec);
};
}
@@ -329,18 +325,6 @@ private:
return can_write;
}
// Resolves the address passed in async_run and store the results
// in the endpoints_ member variable.
template <class CompletionToken = default_completion_token_type>
auto
async_resolve(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::resolve_op<client>{this}, token, resv_.get_executor());
}
// Calls client::async_resolve with the resolve timeout passed in
// the config. Uses the write_timer_ to perform the timeout op.
template <class CompletionToken = default_completion_token_type>
@@ -355,19 +339,6 @@ private:
token, resv_.get_executor());
}
// Connects the socket to one of the endpoints in endpoints_ and
// stores the successful endpoint in endpoint_.
template <class CompletionToken = default_completion_token_type>
auto
async_connect(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::connect_op<client>{this}, token,
write_timer_.get_executor());
}
// Calls client::async_connect with a timeout.
template <class CompletionToken = default_completion_token_type>
auto
@@ -385,22 +356,15 @@ private:
// timeout config::read_timeout.
template <class CompletionToken = default_completion_token_type>
auto
async_read_with_timeout(CompletionToken&& token = default_completion_token_type{})
async_read_with_timeout(
Command cmd,
CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::read_with_timeout_op<client>{this}, token, read_timer_.get_executor());
}
template <class CompletionToken = default_completion_token_type>
auto
async_read(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::read_op<client>{this}, token, read_timer_.get_executor());
, void(boost::system::error_code, std::size_t)
>(detail::read_with_timeout_op<client, Command>{this, cmd},
token, read_timer_.get_executor());
}
// Loops on async_read_with_timeout described above.
@@ -420,7 +384,7 @@ private:
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
, void(boost::system::error_code, std::size_t)
>(detail::write_op<client>{this}, token, write_timer_);
}
@@ -432,7 +396,7 @@ private:
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
, void(boost::system::error_code, std::size_t)
>(detail::write_with_timeout_op<client>{this}, token, write_timer_);
}
@@ -453,7 +417,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::read_write_check_ping_op<client>{this}, token, read_timer_, write_timer_, wait_write_timer_, check_idle_timer_);
>(detail::read_write_check_ping_op<client>{this}, token, read_timer_, write_timer_, wait_write_timer_, idle_check_timer);
}
template <class CompletionToken = default_completion_token_type>
@@ -468,37 +432,34 @@ private:
template <class CompletionToken = default_completion_token_type>
auto
async_check_idle(CompletionToken&& token = default_completion_token_type{})
async_idle_check(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::check_idle_op<client>{this}, token, check_idle_timer_);
>(detail::idle_check_op<client>{this}, token, idle_check_timer);
}
// Used to resolve the host on async_resolve.
template <class CompletionToken = default_completion_token_type>
auto async_exec2(
request_type& req,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::exec_op2<client, request_type>{this, &req},
token, read_timer_);
}
// IO objects
boost::asio::ip::tcp::resolver resv_;
// The tcp socket.
std::shared_ptr<AsyncReadWriteStream> socket_;
// Timer used with async_read.
boost::asio::steady_timer read_timer_;
// Ping timer.
boost::asio::steady_timer ping_timer_;
// Timer used with async_write_with_timeout.
boost::asio::steady_timer write_timer_;
// Timer that is canceled when a new message is added to the output
// queue.
boost::asio::steady_timer wait_write_timer_;
// Check idle timer.
boost::asio::steady_timer check_idle_timer_;
// Channel used to communicate read events.
boost::asio::steady_timer idle_check_timer;
read_channel_type read_ch_;
read_channel_type push_ch_;
@@ -525,18 +486,10 @@ private:
// Last time we received data.
time_point_type last_data_;
// Used by the read_with_timeout_op.
resp3::type type_;
typename request_type::command_info_type cmd_info_;
// See async_connect.
boost::asio::ip::tcp::endpoint endpoint_;
// See async_resolve.
boost::asio::ip::tcp::resolver::results_type endpoints_;
// write_op helper.
std::size_t bytes_written_ = 0;
request_type req_;
};

View File

@@ -29,6 +29,43 @@ namespace detail {
#include <boost/asio/yield.hpp>
template <class Client, class Request>
struct exec_op2 {
Client* cli;
typename Client::request_type* req;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
yield
boost::asio::async_write(
*cli->socket_,
boost::asio::buffer(req->payload()),
std::move(self));
if (ec) {
self.complete(ec);
return;
}
// Say hello and ignores the response.
yield
resp3::async_read(
*cli->socket_,
cli->make_dynamic_buffer(),
[](resp3::node<boost::string_view> const&, boost::system::error_code&) { },
std::move(self));
self.complete(ec);
}
}
};
template <class Client>
struct exec_op {
Client* cli;
@@ -51,7 +88,7 @@ struct exec_op {
cli->reqs_.back().channel.async_receive(std::move(self));
if (ec) {
self.complete(ec, 0, 0);
self.complete(ec, 0);
return;
}
@@ -63,7 +100,7 @@ struct exec_op {
while (!cli->reqs_.front().req->commands().empty()) {
yield cli->read_ch_.async_receive(std::move(self));
if (ec) {
self.complete(ec, 0, 0);
self.complete(ec, 0);
return;
}
@@ -78,7 +115,7 @@ struct exec_op {
if (!cli->reqs_.empty())
cli->wait_write_timer_.cancel_one();
self.complete({}, cli->bytes_written_, read_size);
self.complete({}, read_size);
return;
}
}
@@ -93,7 +130,6 @@ struct ping_op {
void
operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t write_size = 0
, std::size_t read_size = 0)
{
reenter (coro) for (;;)
@@ -125,7 +161,7 @@ struct ping_op {
};
template <class Client>
struct check_idle_op {
struct idle_check_op {
Client* cli;
boost::asio::coroutine coro;
@@ -134,8 +170,8 @@ struct check_idle_op {
{
reenter (coro) for (;;)
{
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_delay_timeout);
yield cli->check_idle_timer_.async_wait(std::move(self));
cli->idle_check_timer.expires_after(2 * cli->cfg_.ping_delay_timeout);
yield cli->idle_check_timer.async_wait(std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -153,66 +189,6 @@ struct check_idle_op {
}
};
template <class Client>
struct resolve_op {
Client* cli;
boost::asio::coroutine coro;
template <class Self>
void
operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::resolver::results_type res = {})
{
reenter (coro)
{
yield
cli->resv_.async_resolve(
cli->cfg_.host.data(),
cli->cfg_.port.data(),
std::move(self));
if (ec) {
self.complete(ec);
return;
}
cli->endpoints_ = res;
self.complete({});
}
}
};
template <class Client>
struct connect_op {
Client* cli;
boost::asio::coroutine coro;
template <class Self>
void
operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::endpoint const& ep = {})
{
reenter (coro)
{
yield
boost::asio::async_connect(
*cli->socket_,
cli->endpoints_,
std::move(self));
if (ec) {
self.complete(ec);
return;
}
cli->endpoint_ = ep;
self.complete({});
}
}
};
template <class Client>
struct resolve_with_timeout_op {
Client* cli;
@@ -222,6 +198,7 @@ struct resolve_with_timeout_op {
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::asio::ip::tcp::resolver::results_type res = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
@@ -232,7 +209,7 @@ struct resolve_with_timeout_op {
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->async_resolve(token);},
[this](auto token) { return cli->resv_.async_resolve(cli->cfg_.host.data(), cli->cfg_.port.data(), token);},
[this](auto token) { return cli->write_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
@@ -258,6 +235,7 @@ struct resolve_with_timeout_op {
default: BOOST_ASSERT(false);
}
cli->endpoints_ = res;
self.complete({});
}
}
@@ -272,6 +250,7 @@ struct connect_with_timeout_op {
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::asio::ip::tcp::endpoint const& ep = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
@@ -282,7 +261,7 @@ struct connect_with_timeout_op {
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->async_connect(token);},
[this](auto token) { return boost::asio::async_connect(*cli->socket_, cli->endpoints_, token);},
[this](auto token) { return cli->write_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
@@ -335,7 +314,7 @@ struct read_write_check_ping_op {
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->writer(token);},
[this](auto token) { return cli->reader(token);},
[this](auto token) { return cli->async_check_idle(token);},
[this](auto token) { return cli->async_idle_check(token);},
[this](auto token) { return cli->async_ping(token);}
).async_wait(
boost::asio::experimental::wait_for_one_error(),
@@ -395,7 +374,13 @@ struct run_op {
return;
}
// TODO: Send hello here.
cli->req_.clear();
cli->req_.push(Command::hello, 3);
yield cli->async_exec2(cli->req_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
yield cli->async_read_write_check_ping(std::move(self));
if (ec) {
@@ -429,7 +414,7 @@ struct write_op {
std::move(self));
if (ec) {
self.complete(ec);
self.complete(ec, 0);
return;
}
@@ -444,8 +429,7 @@ struct write_op {
boost::asio::buffer(cli->reqs_.front().req->payload()),
std::move(self));
cli->bytes_written_ = n;
self.complete(ec);
self.complete(ec, n);
}
}
};
@@ -459,6 +443,7 @@ struct write_with_timeout_op {
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, std::size_t n = 0
, boost::system::error_code ec2 = {})
{
reenter (coro)
@@ -477,7 +462,7 @@ struct write_with_timeout_op {
case 0:
{
if (ec1) {
self.complete(ec1);
self.complete(ec1, 0);
return;
}
} break;
@@ -485,7 +470,7 @@ struct write_with_timeout_op {
case 1:
{
if (!ec2) {
self.complete(generic::error::write_timeout);
self.complete(generic::error::write_timeout, 0);
return;
}
} break;
@@ -493,7 +478,7 @@ struct write_with_timeout_op {
default: BOOST_ASSERT(false);
}
self.complete({});
self.complete({}, n);
}
}
};
@@ -504,7 +489,10 @@ struct writer_op {
boost::asio::coroutine coro;
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
void
operator()(Self& self,
boost::system::error_code ec = {},
std::size_t n = 0)
{
reenter (coro) for (;;)
{
@@ -520,7 +508,7 @@ struct writer_op {
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->reqs_.front().req->empty());
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(cli->bytes_written_ == cli->reqs_.front().req->size());
BOOST_ASSERT(n == cli->reqs_.front().req->size());
yield cli->wait_write_timer_.async_wait(std::move(self));
@@ -532,46 +520,17 @@ struct writer_op {
}
};
template <class Client>
struct read_op {
Client* cli;
boost::asio::coroutine coro;
template <class Self>
void
operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
yield
resp3::async_read(
*cli->socket_,
cli->make_dynamic_buffer(),
cli->wrap_adapter(),
std::move(self));
if (ec) {
self.complete(ec);
return;
}
cli->cmd_info_.second = n;
self.complete({});
}
}
};
template <class Client>
template <class Client, class Command>
struct read_with_timeout_op {
Client* cli;
Command cmd;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, std::size_t n = 0
, boost::system::error_code ec2 = {})
{
reenter (coro)
@@ -580,7 +539,7 @@ struct read_with_timeout_op {
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->async_read(token);},
[this](auto token) { return resp3::async_read(*cli->socket_, cli->make_dynamic_buffer(), cli->select_adapter(cmd), token);},
[this](auto token) { return cli->read_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
@@ -590,7 +549,7 @@ struct read_with_timeout_op {
case 0:
{
if (ec1) {
self.complete(ec1);
self.complete(ec1, 0);
return;
}
} break;
@@ -598,7 +557,7 @@ struct read_with_timeout_op {
case 1:
{
if (!ec2) {
self.complete(generic::error::read_timeout);
self.complete(generic::error::read_timeout, 0);
return;
}
} break;
@@ -606,7 +565,7 @@ struct read_with_timeout_op {
default: BOOST_ASSERT(false);
}
self.complete({});
self.complete({}, n);
}
}
};
@@ -614,6 +573,8 @@ struct read_with_timeout_op {
template <class Client, class Command>
struct reader_op {
Client* cli;
resp3::type type_ = resp3::type::invalid;
Command cmd_ = Command::invalid;
boost::asio::coroutine coro;
template <class Self>
@@ -646,34 +607,35 @@ struct reader_op {
// channel. The only way to detect them is check whether the
// queue is empty.
BOOST_ASSERT(!cli->read_buffer_.empty());
cli->type_ = resp3::to_type(cli->read_buffer_.front());
cli->cmd_info_ = std::make_pair(Command::invalid, 0);
if (cli->type_ != resp3::type::push) {
type_ = resp3::to_type(cli->read_buffer_.front());
cmd_ = Command::invalid;
if (type_ != resp3::type::push) {
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->reqs_.front().req->commands().empty());
cli->cmd_info_ = cli->reqs_.front().req->commands().front();
cmd_ = cli->reqs_.front().req->commands().front().first;
}
cli->last_data_ = std::chrono::steady_clock::now();
yield cli->async_read_with_timeout(std::move(self));
yield
cli->async_read_with_timeout(cmd_, std::move(self));
if (ec) {
cli->close();
self.complete(ec);
return;
}
if (cli->cmd_info_.first == Command::invalid) {
if (cmd_ == Command::invalid) {
yield
cli->push_ch_.async_send(
boost::system::error_code{},
cli->cmd_info_.second,
n,
std::move(self));
} else {
yield
cli->read_ch_.async_send(
boost::system::error_code{},
cli->cmd_info_.second,
n,
std::move(self));
}

View File

@@ -46,6 +46,7 @@ namespace generic {
template <class Command>
class request {
public:
using command_type = Command;
using command_info_type = std::pair<Command, std::size_t>;
private:

View File

@@ -145,7 +145,7 @@ int main()
request<command> req;
req.push(command::hello, 3);
req.push(command::subscribe, "channel");
db->async_exec(req, [](auto, auto, auto){});
db->async_exec(req, [](auto, auto){});
auto resp = std::make_shared<response_type>();
db->set_adapter(adapt(*resp));

View File

@@ -74,7 +74,7 @@ int main()
// Sends hello and ignores the response.
request<command> req;
req.push(command::hello, 3);
db->async_exec(req, [](auto, auto, auto){});
db->async_exec(req, [](auto, auto){});
// TCP acceptor.
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};

View File

@@ -24,9 +24,9 @@ auto run_handler =[](auto ec)
std::printf("Run: %s\n", ec.message().data());
};
auto exec_handler = [](auto ec, std::size_t write_size, std::size_t read_size)
auto exec_handler = [](auto ec, std::size_t read_size)
{
std::printf("Exec: %s %u %u\n", ec.message().data(), write_size, read_size);
std::printf("Exec: %s %lu\n", ec.message().data(), read_size);
};
int main()
@@ -38,10 +38,6 @@ int main()
client_type db{ioc.get_executor()};
db.set_adapter(adapt(resp));
request<command> req1;
req1.push(command::hello, 3);
db.async_exec(req1, exec_handler);
request<command> req2;
req2.push(command::set, "intro-key", "message1");
req2.push(command::get, "intro-key");

View File

@@ -72,9 +72,9 @@ auto on_run =[](auto ec)
std::printf("Run: %s\n", ec.message().data());
};
auto on_exec = [](auto ec, std::size_t write_size, std::size_t read_size)
auto on_exec = [](auto ec, std::size_t read_size)
{
std::printf("Exec: %s %u %u\n", ec.message().data(), write_size, read_size);
std::printf("Exec: %s %lu\n", ec.message().data(), read_size);
};
int main()
@@ -87,7 +87,6 @@ int main()
client_type db{ioc.get_executor(), adapter{resp0, resp1, resp2}};
request<command> req;
req.push(command::hello, 3);
req.push_range(command::rpush, "rpush-key", vec);
req.push_range(command::sadd, "sadd-key", set);
req.push_range(command::hset, "hset-key", map);

View File

@@ -67,7 +67,7 @@ int main()
request<command> req;
req.push(command::hello, 3);
req.push(command::subscribe, "channel1", "channel2");
db->async_exec(req, [&](auto, auto, auto){req.clear();});
db->async_exec(req, [&](auto, auto){req.clear();});
db->async_run(run_handler);
ioc.run();

View File

@@ -85,7 +85,7 @@ void test_quit()
request<command> req;
req.push(command::hello, 3);
req.push(command::quit);
db->async_exec(req, [](auto ec, auto w, auto r){
db->async_exec(req, [](auto ec, auto r){
expect_no_error(ec);
//expect_eq(w, 36UL);
//expect_eq(r, 152UL);
@@ -124,7 +124,7 @@ void test_push()
req.push(command::subscribe, "channel");
req.push(command::quit);
db->async_exec(req, [](auto ec, auto w, auto r){
db->async_exec(req, [](auto ec, auto r){
expect_no_error(ec);
//expect_eq(w, 68UL);
//expect_eq(r, 151UL);
@@ -147,7 +147,7 @@ net::awaitable<void> run5(std::shared_ptr<client_type> db)
request<command> req;
req.push(command::hello, 3);
req.push(command::quit);
db->async_exec(req, [](auto ec, auto, auto){
db->async_exec(req, [](auto ec, auto){
expect_no_error(ec);
});
@@ -159,7 +159,7 @@ net::awaitable<void> run5(std::shared_ptr<client_type> db)
request<command> req;
req.push(command::hello, 3);
req.push(command::quit);
db->async_exec(req, [](auto ec, auto, auto){
db->async_exec(req, [](auto ec, auto){
expect_no_error(ec);
});
@@ -194,7 +194,7 @@ void test_idle()
req.push(command::hello, 3);
req.push(command::client, "PAUSE", 5000);
db->async_exec(req, [](auto ec, auto w, auto r){
db->async_exec(req, [](auto ec, auto r){
expect_no_error(ec);
});