2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Server role check and progresses with TLS.

This commit is contained in:
Marcelo Zimbres
2022-09-10 16:28:42 +02:00
parent d8607af669
commit e2d642f34c
13 changed files with 223 additions and 60 deletions

View File

@@ -13,7 +13,15 @@
* Adds example on how to resolve addresses over sentinels, see
subscriber_sentinel.cpp.
* Adds `endpoint` class.
* Adds `endpoint` where in addition to host and port, users can also
optionally provide username and password that are needed to connect
to the Redis server and the expected server role (see
`error::unexpected_server_role`).
* `connection::async_run` checks whether the server role received in
the hello command is equal to the expected server role specified in
the the `aedis::endpoint`. To skip this check let the role variable
empty.
* Removes reconnect functionanlity from the `connection` class. It is
possible in simple reconnection strategies but bloats the
@@ -25,8 +33,6 @@
* Fixes a bug in `connection::async_receive_push` that prevented
passing any response adapter other that `adapt(std::vector<node>)`.
* Ports the buildsystem from autotools to CMake.
* Changes the behaviour of `aedis::adapt()` that caused RESP3 errors
to be ignored. One consequence of it is that `connection::async_run`
would not exit with failure in servers that required authentication.
@@ -35,6 +41,8 @@
to complete with success when an error in the
`connection::async_exec` occurred.
* Moves the buildsystem from autotools to CMake.
## v1.0.0
* Adds experimental cmake support for windows users.

View File

@@ -62,7 +62,6 @@ net::awaitable<void> reconnect(std::shared_ptr<connection> db)
}
}
// Publishes messages to other users.
net::awaitable<void> publisher(stream_descriptor& in, std::shared_ptr<connection> db)
{

View File

@@ -42,25 +42,29 @@ awaitable_type echo_loop(tcp_socket socket, std::shared_ptr<connection> db)
}
}
// TODO: Print the async_run error.
awaitable_type listener()
awaitable_type listener(std::shared_ptr<connection> db)
{
auto ex = co_await net::this_coro::executor;
auto db = std::make_shared<connection>(ex);
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, net::detached);
tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555});
for (;;)
net::co_spawn(ex, echo_loop(co_await acc.async_accept(), db), net::detached);
}
// TODO: Perform signal handling.
auto main() -> int
{
try {
net::io_context ioc{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO};
co_spawn(ioc, listener(), net::detached);
auto db = std::make_shared<connection>(ioc);
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, [&](auto const& ec) {
std::clog << ec.message() << std::endl;
ioc.stop();
});
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });
co_spawn(ioc, listener(db), net::detached);
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -45,14 +45,15 @@ auto main() -> int
std::tuple<std::string, aedis::ignore> resp;
endpoint ep{"127.0.0.1", "6379"};
db.async_run(ep, req, adapt(resp), [](auto ec, auto) {
db.async_run(ep, req, adapt(resp), [&](auto ec, auto) {
std::cout << ec.message() << std::endl;
db.close();
});
ioc.run();
std::cout << std::get<0>(resp) << std::endl;
} catch (...) {
std::cerr << "Error" << std::endl;
std::cout << "----> " << std::get<0>(resp) << std::endl;
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
}

View File

@@ -32,6 +32,8 @@ public:
using base_type = connection_base<executor_type, connection<AsyncReadWriteStream>>;
using this_type = connection<next_layer_type>;
/** @brief Connection configuration parameters.
*/
struct config {
@@ -90,16 +92,13 @@ private:
template <class> friend struct detail::connect_with_timeout_op;
template <class> friend struct detail::run_op;
template <
class EndpointSequence,
class CompletionToken
>
auto async_connect(
detail::conn_timer_t<executor_type>& timer,
EndpointSequence ep,
CompletionToken&& token)
template <class CompletionToken>
auto async_connect(CompletionToken&& token)
{
return detail::async_connect(next_layer(), timer, ep, std::move(token));
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::connect_with_timeout_op<this_type>{this}, token, stream_);
}
void close() { stream_.close(); }

View File

@@ -304,7 +304,7 @@ public:
/// @}
private:
protected:
using clock_type = std::chrono::steady_clock;
using clock_traits_type = boost::asio::wait_traits<clock_type>;
using timer_type = boost::asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
@@ -333,7 +333,6 @@ private:
template <class, class> friend struct detail::exec_op;
template <class, class> friend struct detail::exec_read_op;
template <class, class> friend struct detail::runexec_op;
template <class> friend struct detail::connect_with_timeout_op;
template <class> friend struct detail::resolve_with_timeout_op;
template <class> friend struct detail::check_idle_op;
template <class> friend struct detail::start_op;
@@ -372,15 +371,6 @@ private:
token, resv_);
}
template <class CompletionToken>
auto async_connect_with_timeout(CompletionToken&& token)
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::connect_with_timeout_op<Derived>{&derived()}, token, resv_);
}
template <class CompletionToken>
auto reader(CompletionToken&& token)
{
@@ -451,6 +441,36 @@ private:
}
}
void prepare_hello(endpoint const& ep)
{
req_.clear();
if (requires_auth(ep)) {
req_.push("HELLO", "3", "AUTH", ep.username, ep.password);
} else {
req_.push("HELLO", "3");
}
}
bool expect_role(std::string const& expected)
{
if (std::empty(expected))
return true;
resp3::node<std::string> role_node;
role_node.data_type = resp3::type::blob_string;
role_node.aggregate_size = 1;
role_node.depth = 1;
role_node.value = "role";
auto iter = std::find(std::cbegin(response_), std::cend(response_), role_node);
if (iter == std::end(response_))
return false;
++iter;
BOOST_ASSERT(iter != std::cend(response_));
return iter->value == expected;
}
// IO objects
resolver_type resv_;
timer_type ping_timer_;
@@ -471,6 +491,7 @@ private:
boost::asio::ip::tcp::resolver::results_type endpoints_;
resp3::request req_;
std::vector<resp3::node<std::string>> response_;
};
} // aedis

View File

@@ -35,7 +35,7 @@ namespace aedis::detail {
template <class Conn>
struct connect_with_timeout_op {
Conn* conn;
Conn* conn = nullptr;
boost::asio::coroutine coro{};
template <class Self>
@@ -47,7 +47,8 @@ struct connect_with_timeout_op {
{
conn->ping_timer_.expires_after(conn->get_config().connect_timeout);
yield
conn->derived().async_connect(conn->ping_timer_, conn->endpoints_, std::move(self));
detail::async_connect(
conn->next_layer(), conn->ping_timer_, conn->endpoints_, std::move(self));
self.complete(ec);
}
}
@@ -400,37 +401,38 @@ struct run_op {
}
yield
conn->async_connect_with_timeout(std::move(self));
conn->derived().async_connect(std::move(self));
if (ec) {
conn->cancel(Conn::operation::run);
self.complete(ec);
return;
}
conn->req_.clear();
if (requires_auth(*ep)) {
conn->req_.push("HELLO", "3", "AUTH", ep->username, ep->password);
} else {
conn->req_.push("HELLO", "3");
}
conn->prepare_hello(*ep);
conn->ping_timer_.expires_after(conn->get_config().ping_interval);
yield
async_exec(
resp3::detail::async_exec(
conn->next_layer(),
conn->ping_timer_,
conn->req_,
adapter::adapt2(),
adapter::adapt2(conn->response_),
conn->make_dynamic_buffer(),
std::move(self)
);
if (ec) {
conn->cancel(Conn::operation::run);
self.complete(ec);
return;
}
if (!conn->expect_role(ep->role)) {
conn->cancel(Conn::operation::run);
self.complete(error::unexpected_server_role);
return;
}
conn->write_buffer_.clear();
conn->cmds_ = 0;
std::for_each(std::begin(conn->reqs_), std::end(conn->reqs_), [](auto const& ptr) {

View File

@@ -20,8 +20,8 @@ struct endpoint {
/// Redis server port.
std::string port;
/// Role master or replica.
std::string role{"master"};
/// Expected role if any.
std::string role{};
/// Username if authentication is required.
std::string username{};

View File

@@ -71,7 +71,13 @@ enum class error
not_a_double,
/// Got RESP3 null.
resp3_null
resp3_null,
/// Unexpected server role.
unexpected_server_role,
/// SSL handshake timeout.
ssl_handshake_timeout,
};
/** \internal

View File

@@ -40,6 +40,8 @@ struct error_category_impl : boost::system::error_category {
case error::incompatible_size: return "Aggregate container has incompatible size.";
case error::not_a_double: return "Not a double.";
case error::resp3_null: return "Got RESP3 null.";
case error::unexpected_server_role: return "Unexpected server role.";
case error::ssl_handshake_timeout: return "SSL handshake timeout.";
default: BOOST_ASSERT(false); return "Aedis error.";
}
}

View File

@@ -15,6 +15,102 @@
namespace aedis::ssl {
namespace detail
{
#include <boost/asio/yield.hpp>
template <class Stream>
struct handshake_op {
Stream* stream;
aedis::detail::conn_timer_t<typename Stream::executor_type>* timer;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token)
{
return stream->async_handshake(boost::asio::ssl::stream_base::client, token);
},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0: self.complete(ec1); return;
case 1:
{
BOOST_ASSERT_MSG(!ec2, "handshake_op: Incompatible state.");
self.complete(error::ssl_handshake_timeout);
return;
}
default: BOOST_ASSERT(false);
}
}
}
};
template <
class Stream,
class CompletionToken
>
auto async_handshake(
Stream& stream,
aedis::detail::conn_timer_t<typename Stream::executor_type>& timer,
CompletionToken&& token)
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(handshake_op<Stream>{&stream, &timer}, token, stream, timer);
}
template <class Conn>
struct ssl_connect_with_timeout_op {
Conn* conn = nullptr;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::endpoint const& = {})
{
reenter (coro)
{
conn->ping_timer_.expires_after(conn->get_config().connect_timeout);
yield
aedis::detail::async_connect(
conn->lowest_layer(), conn->ping_timer_, conn->endpoints_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
conn->ping_timer_.expires_after(conn->get_config().handshake_timeout);
yield
async_handshake(conn->next_layer(), conn->ping_timer_, std::move(self));
self.complete(ec);
}
}
};
#include <boost/asio/unyield.hpp>
} // detail
template <class>
class connection;
@@ -34,6 +130,7 @@ public:
using executor_type = typename next_layer_type::executor_type;
using base_type = connection_base<executor_type, connection<boost::asio::ssl::stream<AsyncReadWriteStream>>>;
using this_type = connection<next_layer_type>;
/** @brief Connection configuration parameters.
*/
@@ -44,6 +141,9 @@ public:
/// Timeout of the connect operation.
std::chrono::milliseconds connect_timeout = std::chrono::seconds{10};
/// Timeout of the ssl handshake operation.
std::chrono::milliseconds handshake_timeout = std::chrono::seconds{10};
/// Time interval of ping operations.
std::chrono::milliseconds ping_interval = std::chrono::seconds{1};
@@ -86,19 +186,18 @@ public:
auto& lowest_layer() noexcept { return stream_.lowest_layer(); }
auto is_open() const noexcept { return stream_.next_layer().is_open(); }
template <
class EndpointSequence,
class CompletionToken
>
auto async_connect(
detail::conn_timer_t<executor_type>& timer,
EndpointSequence ep,
CompletionToken&& token)
template <class CompletionToken>
auto async_connect(CompletionToken&& token)
{
return detail::async_connect(lowest_layer(), timer, ep, std::move(token));
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::ssl_connect_with_timeout_op<this_type>{this}, token, stream_);
}
private:
template <class> friend struct detail::ssl_connect_with_timeout_op;
config cfg_;
executor_type ex_;
next_layer_type stream_;

View File

@@ -77,3 +77,24 @@ BOOST_AUTO_TEST_CASE(test_auth_fail)
ioc.run();
}
BOOST_AUTO_TEST_CASE(test_wrong_role)
{
std::cout << boost::unit_test::framework::current_test_case().p_name << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
// Should cause an error in the authentication as our redis server
// has no authentication configured.
endpoint ep;
ep.host = "127.0.0.1";
ep.port = "6379";
ep.role = "errado";
db->async_run(ep, [](auto ec) {
BOOST_CHECK_EQUAL(ec, aedis::error::unexpected_server_role);
});
ioc.run();
}

View File

@@ -767,6 +767,7 @@ BOOST_AUTO_TEST_CASE(error)
check_error("aedis", aedis::error::incompatible_size);
check_error("aedis", aedis::error::not_a_double);
check_error("aedis", aedis::error::resp3_null);
check_error("aedis", aedis::error::unexpected_server_role);
}
std::string get_type_as_str(aedis::resp3::type t)