2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Progresses with code quality.

This commit is contained in:
Marcelo Zimbres
2022-09-12 21:57:57 +02:00
parent a23a3db9ac
commit 798f193f14
9 changed files with 205 additions and 125 deletions

View File

@@ -48,7 +48,8 @@
"cacheVariables": {
"CMAKE_BUILD_TYPE": "Debug",
"CMAKE_CXX_EXTENSIONS": "OFF",
"CMAKE_CXX_FLAGS": "-Wall -Wextra",
"CMAKE_CXX_FLAGS": "-Wall -Wextra -fsanitize=address",
"CMAKE_SHARED_LINKER_FLAGS": "-fsanitize=address",
"CMAKE_CXX_STANDARD_REQUIRED": "ON",
"PROJECT_BINARY_DIR": "${sourceDir}/build/dev"
}

View File

@@ -33,7 +33,7 @@ void reconnect(connection& conn)
for (;;) {
boost::system::error_code ec;
conn.run(ep, req, adapt(), ec);
// TODO: Call conn.reset.
conn.reset_stream();
std::cout << ec.message() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
}

View File

@@ -163,7 +163,8 @@ public:
* @li Performs a RESP3 handshake by sending a `HELLO` command
* with protocol version 3 and optionally credentials necessary
* for authentication, the last are read from the `endpoint`
* object. TODO: specify a timeout.
* object. The timeout used is that specified in
* `config::resp3_handshake_timeout`.
*
* @li Erase any password that are contained in
* `endpoint::password`.
@@ -195,12 +196,13 @@ public:
* @endcode
*/
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_run(endpoint& ep, CompletionToken token = CompletionToken{})
auto async_run(endpoint ep, CompletionToken token = CompletionToken{})
{
ep_ = ep;
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::run_op<Derived>{&derived(), &ep}, token, resv_);
>(detail::run_op<Derived>{&derived()}, token, resv_);
}
/** @brief Connects and executes a request asynchronously.
@@ -226,7 +228,7 @@ public:
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_run(
endpoint& ep,
endpoint ep,
resp3::request const& req,
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
@@ -235,7 +237,7 @@ public:
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::runexec_op<Derived, Adapter>
{&derived(), &ep, &req, adapter}, token, resv_);
{&derived(), ep, &req, adapter}, token, resv_);
}
/** @brief Executes a command on the redis server asynchronously.
@@ -371,12 +373,12 @@ protected:
{ return boost::asio::dynamic_buffer(read_buffer_, derived().get_config().max_read_size); }
template <class CompletionToken>
auto async_resolve_with_timeout(endpoint& ep, CompletionToken&& token)
auto async_resolve_with_timeout(CompletionToken&& token)
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::resolve_with_timeout_op<Derived>{&derived(), &ep},
>(detail::resolve_with_timeout_op<Derived>{&derived()},
token, resv_);
}
@@ -501,6 +503,7 @@ protected:
resp3::request req_;
std::vector<resp3::node<std::string>> response_;
endpoint ep_;
};
} // aedis

View File

@@ -26,9 +26,6 @@
#include <aedis/resp3/write.hpp>
#include <aedis/resp3/request.hpp>
#define HANDLER_LOCATION \
BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, __func__))
namespace aedis::detail {
#include <boost/asio/yield.hpp>
@@ -57,7 +54,6 @@ struct connect_with_timeout_op {
template <class Conn>
struct resolve_with_timeout_op {
Conn* conn = nullptr;
endpoint* ep = nullptr;
boost::asio::coroutine coro{};
template <class Self>
@@ -71,7 +67,7 @@ struct resolve_with_timeout_op {
yield
aedis::detail::async_resolve(
conn->resv_, conn->ping_timer_,
ep->host, ep->port, std::move(self));
conn->ep_.host, conn->ep_.port, std::move(self));
conn->endpoints_ = res;
self.complete(ec);
}
@@ -381,7 +377,6 @@ struct start_op {
template <class Conn>
struct run_op {
Conn* conn = nullptr;
endpoint* ep = nullptr;
boost::asio::coroutine coro{};
template <class Self>
@@ -393,7 +388,7 @@ struct run_op {
reenter (coro)
{
yield
conn->async_resolve_with_timeout(*ep, std::move(self));
conn->async_resolve_with_timeout(std::move(self));
if (ec) {
conn->cancel(Conn::operation::run);
self.complete(ec);
@@ -408,7 +403,7 @@ struct run_op {
return;
}
conn->prepare_hello(*ep);
conn->prepare_hello(conn->ep_);
conn->ping_timer_.expires_after(conn->get_config().resp3_handshake_timeout);
yield
@@ -427,9 +422,9 @@ struct run_op {
return;
}
// TODO: Erase the password.
conn->ep_.password.clear();
if (!conn->expect_role(ep->role)) {
if (!conn->expect_role(conn->ep_.role)) {
conn->cancel(Conn::operation::run);
self.complete(error::unexpected_server_role);
return;
@@ -567,7 +562,7 @@ struct reader_op {
template <class Conn, class Adapter>
struct runexec_op {
Conn* conn = nullptr;
endpoint* ep = nullptr;
endpoint ep;
resp3::request const* req = nullptr;
Adapter adapter;
boost::asio::coroutine coro{};
@@ -585,7 +580,7 @@ struct runexec_op {
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return conn->async_run(*ep, token);},
[this, ep2 = ep](auto token) { return conn->async_run(ep2, token);},
[this](auto token) { return conn->async_exec(*req, adapter, token);}
).async_wait(
boost::asio::experimental::wait_for_one_error(),

View File

@@ -12,111 +12,16 @@
#include <boost/asio/io_context.hpp>
#include <aedis/connection_base.hpp>
#include <aedis/ssl/detail/connection_ops.hpp>
namespace aedis::ssl {
namespace detail
{
#include <boost/asio/yield.hpp>
template <class Stream>
struct handshake_op {
Stream* stream;
aedis::detail::conn_timer_t<typename Stream::executor_type>* timer;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token)
{
return stream->async_handshake(boost::asio::ssl::stream_base::client, token);
},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0: self.complete(ec1); return;
case 1:
{
BOOST_ASSERT_MSG(!ec2, "handshake_op: Incompatible state.");
self.complete(error::ssl_handshake_timeout);
return;
}
default: BOOST_ASSERT(false);
}
}
}
};
template <
class Stream,
class CompletionToken
>
auto async_handshake(
Stream& stream,
aedis::detail::conn_timer_t<typename Stream::executor_type>& timer,
CompletionToken&& token)
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(handshake_op<Stream>{&stream, &timer}, token, stream, timer);
}
template <class Conn>
struct ssl_connect_with_timeout_op {
Conn* conn = nullptr;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::endpoint const& = {})
{
reenter (coro)
{
conn->ping_timer_.expires_after(conn->get_config().connect_timeout);
yield
aedis::detail::async_connect(
conn->lowest_layer(), conn->ping_timer_, conn->endpoints_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
conn->ping_timer_.expires_after(conn->get_config().handshake_timeout);
yield
async_handshake(conn->next_layer(), conn->ping_timer_, std::move(self));
self.complete(ec);
}
}
};
#include <boost/asio/unyield.hpp>
} // detail
template <class>
class connection;
/** @brief Connection to the Redis server over SSL sockets.
* @ingroup any
*/
template <class>
class connection;
template <class AsyncReadWriteStream>
class connection<boost::asio::ssl::stream<AsyncReadWriteStream>> :
public connection_base<

View File

@@ -0,0 +1,112 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_SSL_CONNECTION_OPS_HPP
#define AEDIS_SSL_CONNECTION_OPS_HPP
#include <array>
#include <boost/assert.hpp>
#include <boost/system.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
namespace aedis::ssl::detail
{
#include <boost/asio/yield.hpp>
template <class Stream>
struct handshake_op {
Stream* stream;
aedis::detail::conn_timer_t<typename Stream::executor_type>* timer;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token)
{
return stream->async_handshake(boost::asio::ssl::stream_base::client, token);
},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0: self.complete(ec1); return;
case 1:
{
BOOST_ASSERT_MSG(!ec2, "handshake_op: Incompatible state.");
self.complete(error::ssl_handshake_timeout);
return;
}
default: BOOST_ASSERT(false);
}
}
}
};
template <
class Stream,
class CompletionToken
>
auto async_handshake(
Stream& stream,
aedis::detail::conn_timer_t<typename Stream::executor_type>& timer,
CompletionToken&& token)
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(handshake_op<Stream>{&stream, &timer}, token, stream, timer);
}
template <class Conn>
struct ssl_connect_with_timeout_op {
Conn* conn = nullptr;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::endpoint const& = {})
{
reenter (coro)
{
conn->ping_timer_.expires_after(conn->get_config().connect_timeout);
yield
aedis::detail::async_connect(
conn->lowest_layer(), conn->ping_timer_, conn->endpoints_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
conn->ping_timer_.expires_after(conn->get_config().handshake_timeout);
yield
async_handshake(conn->next_layer(), conn->ping_timer_, std::move(self));
self.complete(ec);
}
}
};
#include <boost/asio/unyield.hpp>
} // aedis::ssl::detail
#endif // AEDIS_SSL_CONNECTION_OPS_HPP

View File

@@ -23,16 +23,30 @@ namespace aedis {
template <class Connection>
class sync {
public:
// TODO: Add cancel and reset functions.
/// Config options from the underlying connection.
using config = typename Connection::config;
/// Operation options from the underlying connection.
using operation = typename Connection::operation;
/// The executor type of the underlysing connection.
using executor_type = typename Connection::executor_type;
/** @brief Constructor
*
* @param ex Executor
* @param cfg Config options.
*/
template <class Executor>
explicit sync(Executor ex, config cfg = config{}) : conn_{ex, cfg} { }
explicit sync(executor_type ex, config cfg = config{}) : conn_{ex, cfg} { }
/** @brief Constructor
*
* @param ex The io_context.
* @param cfg Config options.
*/
explicit sync(boost::asio::io_context& ioc, config cfg = config{})
: sync(ioc, std::move(cfg))
{ }
/** @brief Calls `async_exec` from the underlying connection object.
*
@@ -238,6 +252,55 @@ public:
return res;
}
/** @brief Calls `cancel` in the underlying connection object.
*
* @param op The operation to cancel.
* @returns The number of operations canceled.
*/
template <class ResponseAdapter>
auto cancel(operation op)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, op, &res, &sh]()
{
std::unique_lock ul(sh.mutex);
res = conn_.cancel(op);
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Calls `reset` in the underlying connection object.
*
* @param op The operation to cancel.
* @returns The number of operations canceled.
*/
void reset_stream()
{
sync_helper sh;
auto f = [this, &sh]()
{
std::unique_lock ul(sh.mutex);
conn_.reset_stream();
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
}
private:
struct sync_helper {
std::mutex mutex;

View File

@@ -76,13 +76,14 @@ BOOST_AUTO_TEST_CASE(test_idle)
db->get_config().resolve_timeout = 2 * ms;
db->get_config().connect_timeout = 2 * ms;
db->get_config().ping_interval = 2 * ms;
db->get_config().resp3_handshake_timeout = 2 * ms;
request req;
req.push("QUIT");
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, req, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, aedis::error::exec_timeout);
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
});
ioc.run();

View File

@@ -34,7 +34,7 @@ net::awaitable<void> test_reconnect_impl(std::shared_ptr<connection> db)
boost::system::error_code ec;
co_await db->async_run(ep, req, adapt(), net::redirect_error(net::use_awaitable, ec));
db->reset_stream();
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
}
BOOST_CHECK_EQUAL(i, 5);