2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Adds automatic AUTH and HELLO.

This commit is contained in:
Marcelo Zimbres
2022-08-01 22:46:34 +02:00
parent 20328cd423
commit 3ddb017edb
11 changed files with 218 additions and 27 deletions

View File

@@ -35,7 +35,6 @@ net::awaitable<void> reader(std::shared_ptr<connection> db)
{
try {
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "chat-channel");
co_await db->async_exec(req);

View File

@@ -28,7 +28,6 @@ int main()
{{"key1", 10}, {"key2", 20}, {"key3", 30}};
request req;
req.push("HELLO", 3);
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
req.push("MULTI");
@@ -38,7 +37,6 @@ int main()
req.push("QUIT");
std::tuple<
aedis::ignore, // hello
aedis::ignore, // rpush
aedis::ignore, // hset
aedis::ignore, // multi
@@ -55,6 +53,6 @@ int main()
});
ioc.run();
print(std::get<0>(std::get<6>(resp)).value());
print(std::get<1>(std::get<6>(resp)).value());
print(std::get<0>(std::get<5>(resp)).value());
print(std::get<1>(std::get<5>(resp)).value());
}

View File

@@ -50,10 +50,6 @@ awaitable_type listener()
auto db = std::make_shared<connection>(ex);
db->async_run(net::detached);
request req;
req.push("HELLO", 3);
co_await db->async_exec(req);
tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555});
for (;;)
net::co_spawn(ex, echo_loop(co_await acc.async_accept(), db), net::detached);

View File

@@ -24,16 +24,15 @@ int main()
connection db{ioc};
request req;
req.push("HELLO", 3);
req.push("PING");
req.push("QUIT");
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
std::tuple<std::string, aedis::ignore> resp;
db.async_run(req, adapt(resp), [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
ioc.run();
std::cout << std::get<1>(resp) << std::endl;
std::cout << std::get<0>(resp) << std::endl;
}

View File

@@ -19,6 +19,7 @@ using aedis::resp3::request;
using aedis::experimental::exec;
using connection = aedis::connection<>;
// TODO: What is causing the delay?
int main()
{
try {
@@ -33,7 +34,6 @@ int main()
}));
request req;
req.push("HELLO", 3);
req.push("PING");
req.push("QUIT");

View File

@@ -67,7 +67,6 @@ net::awaitable<void> reconnect(std::shared_ptr<connection> db)
auto ex = co_await net::this_coro::executor;
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
net::steady_timer timer{ex};

View File

@@ -18,6 +18,7 @@ nobase_include_HEADERS =\
$(top_srcdir)/include/aedis/resp3/detail/parser.hpp\
$(top_srcdir)/include/aedis/resp3/type.hpp\
$(top_srcdir)/include/aedis/resp3/read.hpp\
$(top_srcdir)/include/aedis/resp3/exec.hpp\
$(top_srcdir)/include/aedis/resp3/write.hpp\
$(top_srcdir)/include/aedis/resp3/request.hpp\
$(top_srcdir)/include/aedis/resp3/impl/request.ipp\

View File

@@ -53,16 +53,16 @@ public:
*/
struct config {
/// The Redis server address.
boost::string_view host = "127.0.0.1";
std::string host = "127.0.0.1";
/// The Redis server port.
boost::string_view port = "6379";
std::string port = "6379";
/// Username if authentication is required.
boost::string_view username;
std::string username;
/// Password if authentication is required.
boost::string_view password;
std::string password;
/// Timeout of the resolve operation.
std::chrono::milliseconds resolve_timeout = std::chrono::seconds{10};

View File

@@ -24,6 +24,7 @@
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/write.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/resp3/exec.hpp>
#define HANDLER_LOCATION \
BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__))
@@ -382,7 +383,10 @@ struct run_op {
boost::asio::coroutine coro{};
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
void operator()(
Self& self,
boost::system::error_code ec = {},
std::size_t = 0)
{
reenter (coro)
{
@@ -402,6 +406,31 @@ struct run_op {
return;
}
conn->req_.clear();
if (!std::empty(conn->cfg_.username) && !std::empty(conn->cfg_.password))
conn->req_.push("AUTH", conn->cfg_.username, conn->cfg_.password);
conn->req_.push("HELLO", "3");
conn->ping_timer_.expires_after(conn->cfg_.ping_interval);
// TODO: This is going to consume on the first response i.e.
// AUTH, we also have to consume the HELLO.
yield
resp3::async_exec(
*conn->socket_,
conn->ping_timer_,
conn->req_,
adapter::adapt(),
conn->make_dynamic_buffer(),
std::move(self)
);
if (ec) {
conn->cancel_run();
self.complete(ec);
return;
}
std::for_each(std::begin(conn->reqs_), std::end(conn->reqs_), [](auto const& ptr) {
return ptr->written = false;
});

View File

@@ -0,0 +1,178 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_RESP3_EXEC_HPP
#define AEDIS_RESP3_EXEC_HPP
#include <boost/assert.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/request.hpp>
namespace aedis {
namespace resp3 {
namespace detail {
#include <boost/asio/yield.hpp>
template <
class AsyncStream,
class Adapter,
class DynamicBuffer
>
struct exec_op {
AsyncStream* socket = nullptr;
request const* req = nullptr;
Adapter adapter;
DynamicBuffer dbuf{};
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
yield
boost::asio::async_write(
*socket,
boost::asio::buffer(req->payload()),
std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
yield resp3::async_read(*socket, dbuf, adapter, std::move(self));
self.complete(ec, n);
}
}
};
#include <boost/asio/unyield.hpp>
} // detail
template <
class AsyncStream,
class Adapter,
class DynamicBuffer,
class CompletionToken = boost::asio::default_completion_token_t<typename AsyncStream::executor_type>
>
auto async_exec(
AsyncStream& socket,
request const& req,
Adapter adapter,
DynamicBuffer dbuf,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<AsyncStream, Adapter, DynamicBuffer>
{&socket, &req, adapter, dbuf}, token, socket);
}
namespace detail {
#include <boost/asio/yield.hpp>
template <
class AsyncStream,
class Timer,
class Adapter,
class DynamicBuffer
>
struct exec_with_timeout_op {
AsyncStream* socket = nullptr;
Timer* timer = nullptr;
request const* req = nullptr;
Adapter adapter;
DynamicBuffer dbuf{};
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, std::size_t n = 0
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return resp3::async_exec(*socket, *req, adapter, dbuf, token);},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, 0);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(aedis::error::idle_timeout, 0);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, n);
}
}
};
#include <boost/asio/unyield.hpp>
} // detail
template <
class AsyncStream,
class Timer,
class Adapter,
class DynamicBuffer,
class CompletionToken = boost::asio::default_completion_token_t<typename AsyncStream::executor_type>
>
auto async_exec(
AsyncStream& socket,
Timer& timer,
request const& req,
Adapter adapter,
DynamicBuffer dbuf,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_with_timeout_op<AsyncStream, Timer, Adapter, DynamicBuffer>
{&socket, &timer, &req, adapter, dbuf}, token, socket, timer);
}
} // resp3
} // aedis
#endif // AEDIS_RESP3_EXEC_HPP

View File

@@ -78,7 +78,6 @@ void test_quit1(connection::config const& cfg)
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("HELLO", 3);
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
@@ -96,7 +95,6 @@ void test_quit2(connection::config const& cfg)
{
std::cout << "test_quit2" << std::endl;
request req;
req.push("HELLO", 3);
req.push("QUIT");
net::io_context ioc;
@@ -133,7 +131,6 @@ void test_missing_push_reader1(connection::config const& cfg)
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
@@ -150,7 +147,6 @@ void test_missing_push_reader2(connection::config const& cfg)
auto db = std::make_shared<connection>(ioc, cfg);
request req; // Wrong command syntax.
req.push("HELLO", 3);
req.push("SUBSCRIBE");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
@@ -167,7 +163,6 @@ void test_missing_push_reader3(connection::config const& cfg)
auto db = std::make_shared<connection>(ioc, cfg);
request req; // Wrong command synthax.
req.push("HELLO", 3);
req.push("PING", "Message");
req.push("SUBSCRIBE");
@@ -190,7 +185,6 @@ void test_idle()
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("HELLO", 3);
req.push("CLIENT", "PAUSE", 5000);
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
@@ -227,7 +221,6 @@ void test_push_is_received1(connection::config const& cfg)
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
req.push("QUIT");
@@ -251,7 +244,6 @@ void test_push_is_received2(connection::config const& cfg)
{
std::cout << "test_push_is_received2" << std::endl;
request req1;
req1.push("HELLO", 3);
req1.push("PING", "Message1");
request req2;