2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Simplifies and enhances code modularity.

This commit is contained in:
Marcelo Zimbres
2022-06-05 18:26:08 +02:00
parent 5893f0913e
commit a411cc50fc
20 changed files with 472 additions and 364 deletions

View File

@@ -52,12 +52,12 @@ endif
nobase_include_HEADERS =\
$(top_srcdir)/aedis/src.hpp\
$(top_srcdir)/aedis/error.hpp\
$(top_srcdir)/aedis/impl/error.ipp\
$(top_srcdir)/aedis/detail/net.hpp\
$(top_srcdir)/aedis/redis/command.hpp\
$(top_srcdir)/aedis/generic/connection.hpp\
$(top_srcdir)/aedis/generic/request.hpp\
$(top_srcdir)/aedis/generic/error.hpp\
$(top_srcdir)/aedis/generic/adapt.hpp\
$(top_srcdir)/aedis/generic/impl/error.ipp\
$(top_srcdir)/aedis/generic/detail/connection_ops.hpp\
$(top_srcdir)/aedis/sentinel/command.hpp\
$(top_srcdir)/aedis/aedis.hpp\
@@ -74,7 +74,9 @@ nobase_include_HEADERS =\
$(top_srcdir)/aedis/resp3/impl/error.ipp\
$(top_srcdir)/aedis/resp3/type.hpp\
$(top_srcdir)/aedis/resp3/read.hpp\
$(top_srcdir)/aedis/resp3/exec.hpp\
$(top_srcdir)/aedis/resp3/write.hpp\
$(top_srcdir)/aedis/resp3/request.hpp\
$(top_srcdir)/aedis/redis/impl/command.ipp\
$(top_srcdir)/aedis/sentinel/impl/command.ipp\
$(top_srcdir)/aedis/resp3/detail/impl/parser.ipp\

View File

@@ -25,8 +25,8 @@
#include <boost/utility/string_view.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/generic/request.hpp>
#include <aedis/resp3/node.hpp>
#include <aedis/adapter/error.hpp>

View File

@@ -7,14 +7,15 @@
#ifndef AEDIS_HPP
#define AEDIS_HPP
#include <aedis/error.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/write.hpp>
#include <aedis/resp3/exec.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/adapter/adapt.hpp>
#include <aedis/adapter/error.hpp>
#include <aedis/redis/command.hpp>
#include <aedis/sentinel/command.hpp>
#include <aedis/generic/error.hpp>
#include <aedis/generic/request.hpp>
#include <aedis/generic/connection.hpp>
#include <aedis/generic/adapt.hpp>

170
aedis/detail/net.hpp Normal file
View File

@@ -0,0 +1,170 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_NET_HPP
#define AEDIS_NET_HPP
#include <array>
#include <boost/system.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/connect.hpp>
#include <boost/assert.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
namespace aedis {
namespace detail {
#include <boost/asio/yield.hpp>
template <
class Protocol,
class Executor,
class EndpointSequence
>
struct connect_op {
boost::asio::basic_socket<Protocol, Executor>* socket;
boost::asio::steady_timer* timer;
EndpointSequence* endpoints;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, typename Protocol::endpoint const& ep = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token)
{
auto f = [](boost::system::error_code const&, typename Protocol::endpoint const&) { return true; };
return boost::asio::async_connect(*socket, *endpoints, f, token);
},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, ep);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(error::connect_timeout, ep);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, ep);
}
}
};
struct resolve_op {
boost::asio::ip::tcp::resolver* resv;
boost::asio::steady_timer* timer;
boost::string_view host;
boost::string_view port;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::asio::ip::tcp::resolver::results_type res = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return resv->async_resolve(host.data(), port.data(), token);},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, {});
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(error::resolve_timeout, {});
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, res);
}
}
};
#include <boost/asio/unyield.hpp>
template <
class Protocol,
class Executor,
class EndpointSequence,
class CompletionToken = boost::asio::default_completion_token_t<Executor>
>
auto async_connect(
boost::asio::basic_socket<Protocol, Executor>& socket,
boost::asio::steady_timer& timer,
EndpointSequence ep,
CompletionToken&& token = boost::asio::default_completion_token_t<Executor>{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, typename Protocol::endpoint const&)
>(connect_op<Protocol, Executor, EndpointSequence>
{&socket, &timer, &ep}, token, socket, timer);
}
template <
class CompletionToken =
boost::asio::default_completion_token_t<boost::asio::ip::tcp::resolver::executor_type>
>
auto async_resolve(
boost::asio::ip::tcp::resolver& resv,
boost::asio::steady_timer& timer,
boost::string_view host,
boost::string_view port,
CompletionToken&& token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, boost::asio::ip::tcp::resolver::results_type)
>(resolve_op{&resv, &timer, host, port}, token, resv, timer);
}
} // detail
} // aedis
#endif // AEDIS_NET_HPP

View File

@@ -4,13 +4,12 @@
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_GENERIC_ERROR_HPP
#define AEDIS_GENERIC_ERROR_HPP
#ifndef AEDIS_ERROR_HPP
#define AEDIS_ERROR_HPP
#include <boost/system/error_code.hpp>
namespace aedis {
namespace generic {
/** \brief Generic errors.
* \ingroup any
@@ -31,9 +30,6 @@ enum class error
/// Idle timeout.
idle_timeout,
/// Write stop requested.
write_stop_requested,
};
/** \brief Creates a error_code object from an error.
@@ -41,14 +37,13 @@ enum class error
*/
boost::system::error_code make_error_code(error e);
} // generic
} // aedis
namespace std {
template<>
struct is_error_code_enum<::aedis::generic::error> : std::true_type {};
struct is_error_code_enum<::aedis::error> : std::true_type {};
} // std
#endif // AEDIS_GENERIC_ERROR_HPP
#endif // AEDIS_ERROR_HPP

View File

@@ -17,8 +17,8 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/generic/adapt.hpp>
#include <aedis/generic/request.hpp>
#include <aedis/generic/detail/connection_ops.hpp>
namespace aedis {
@@ -43,19 +43,13 @@ public:
using next_layer_type = AsyncReadWriteStream;
/// Type of requests used by the connection.
using request_type = generic::request<Command>;
using request_type = resp3::request<Command>;
using default_completion_token_type = boost::asio::default_completion_token_t<executor_type>;
/** @brief Configuration parameters.
*/
struct config {
/// Ip address or name of the Redis server.
std::string host = "127.0.0.1";
/// Port where the Redis server is listening.
std::string port = "6379";
/// Timeout of the \c async_resolve operation.
std::chrono::milliseconds resolve_timeout = std::chrono::seconds{5};
@@ -168,12 +162,16 @@ public:
* \return This function returns only when there is an error.
*/
template <class CompletionToken = default_completion_token_type>
auto async_run(CompletionToken token = CompletionToken{})
auto
async_run(
boost::string_view host = "127.0.0.1",
boost::string_view port = "6379",
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::run_op<connection, Command>{this}, token, resv_);
>(detail::run_op<connection, Command>{this, host, port}, token, resv_);
}
/** @brief Asynchrnously schedules a command for execution.
@@ -236,9 +234,7 @@ private:
template <class T, class U> friend struct detail::ping_op;
template <class T, class U> friend struct detail::run_op;
template <class T, class U> friend struct detail::exec_op;
template <class T> friend struct detail::exec_internal_impl_op;
template <class T> friend struct detail::exec_internal_op;
template <class T> friend struct detail::write_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::write_with_timeout_op;
template <class T> friend struct detail::connect_with_timeout_op;
@@ -268,22 +264,20 @@ private:
}
auto make_dynamic_buffer()
{
return boost::asio::dynamic_buffer(read_buffer_, cfg_.max_read_size);
}
{ return boost::asio::dynamic_buffer(read_buffer_, cfg_.max_read_size); }
// Calls connection::async_resolve with the resolve timeout passed in
// the config. Uses the write_timer_ to perform the timeout op.
template <class CompletionToken = default_completion_token_type>
auto
async_resolve_with_timeout(
boost::string_view host,
boost::string_view port,
CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::resolve_with_timeout_op<connection>{this},
token, resv_.get_executor());
>(detail::resolve_with_timeout_op<connection>{this, host, port},
token, resv_);
}
// Calls connection::async_connect with a timeout.
@@ -309,16 +303,6 @@ private:
>(detail::reader_op<connection, Command>{this}, token, resv_.get_executor());
}
template <class CompletionToken = default_completion_token_type>
auto
async_write(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::write_op<connection>{this}, token, resv_);
}
// Write with a timeout.
template <class CompletionToken = default_completion_token_type>
auto
@@ -371,18 +355,6 @@ private:
>(detail::check_idle_op<connection>{this}, token, check_idle_timer_);
}
template <class CompletionToken = default_completion_token_type>
auto async_exec_internal_impl(
request_type const& req,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::exec_internal_impl_op<connection>{this, &req},
token, resv_);
}
template <class CompletionToken = default_completion_token_type>
auto async_exec_internal(
request_type const& req,

View File

@@ -9,18 +9,19 @@
#include <array>
#include <boost/assert.hpp>
#include <boost/system.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/connect.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/assert.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <aedis/error.hpp>
#include <aedis/detail/net.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/exec.hpp>
#include <aedis/resp3/write.hpp>
#include <aedis/generic/error.hpp>
#include <aedis/generic/adapt.hpp>
namespace aedis {
@@ -30,36 +31,18 @@ namespace detail {
#include <boost/asio/yield.hpp>
template <class Conn>
struct exec_internal_impl_op {
Conn* cli;
typename Conn::request_type const* req;
struct connect_with_timeout_op {
Conn* conn;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
, boost::asio::ip::tcp::endpoint const& ep = {})
{
reenter (coro)
{
yield
boost::asio::async_write(
*cli->socket_,
boost::asio::buffer(req->payload()),
std::move(self));
if (ec) {
self.complete(ec);
return;
}
yield
resp3::async_read(
*cli->socket_,
cli->make_dynamic_buffer(),
[](resp3::node<boost::string_view> const&, boost::system::error_code&) { },
std::move(self));
yield aedis::detail::async_connect(*conn->socket_, conn->write_timer_, conn->endpoints_, std::move(self));
self.complete(ec);
}
}
@@ -73,44 +56,35 @@ struct exec_internal_op {
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {})
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
// Idle timeout.
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_interval);
yield resp3::async_exec( *cli->socket_, cli->check_idle_timer_, *req, adapter::adapt(), cli->make_dynamic_buffer(), std::move(self));
self.complete(ec);
}
}
};
template <class Conn>
struct resolve_with_timeout_op {
Conn* cli;
boost::string_view host;
boost::string_view port;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::resolver::results_type res = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->async_exec_internal_impl(*req, token);},
[this](auto token) { return cli->check_idle_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(error::idle_timeout);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({});
aedis::detail::async_resolve(cli->resv_, cli->write_timer_, host, port, std::move(self));
cli->endpoints_ = res;
self.complete(ec);
}
}
};
@@ -316,160 +290,6 @@ struct check_idle_op {
}
};
template <class Conn>
struct resolve_with_timeout_op {
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::asio::ip::tcp::resolver::results_type res = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
// Tries to resolve with a timeout. We can use the writer
// timer here as there is no ongoing write operation.
cli->write_timer_.expires_after(cli->cfg_.resolve_timeout);
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->resv_.async_resolve(cli->cfg_.host.data(), cli->cfg_.port.data(), token);},
[this](auto token) { return cli->write_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(error::resolve_timeout);
return;
}
} break;
default: BOOST_ASSERT(false);
}
cli->endpoints_ = res;
self.complete({});
}
}
};
template <
class Protocol,
class Executor,
class EndpointSequence
>
struct connect_with_timeout_impl_op {
boost::asio::basic_socket<Protocol, Executor>* socket;
boost::asio::steady_timer* timer;
EndpointSequence* endpoints;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, typename Protocol::endpoint const& ep = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token)
{
auto f = [](boost::system::error_code const&, typename Protocol::endpoint const&) { return true; };
return boost::asio::async_connect(*socket, *endpoints, f, token);
},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, ep);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(error::connect_timeout, ep);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, ep);
}
}
};
template <
class Protocol,
class Executor,
class EndpointSequence,
class CompletionToken = boost::asio::default_completion_token_t<Executor>
>
auto async_connect_with_timeout(
boost::asio::basic_socket<Protocol, Executor>& socket,
boost::asio::steady_timer& timer,
EndpointSequence ep,
CompletionToken&& token = boost::asio::default_completion_token_t<Executor>{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, typename Protocol::endpoint const&)
>(connect_with_timeout_impl_op<Protocol, Executor, EndpointSequence>
{&socket, &timer, &ep}, token, socket, timer);
}
template <class Conn>
struct connect_with_timeout_op {
Conn* conn;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::endpoint const& ep = {})
{
reenter (coro)
{
conn->write_timer_.expires_after(conn->cfg_.connect_timeout);
yield
async_connect_with_timeout(
*conn->socket_,
conn->write_timer_,
conn->endpoints_,
std::move(self));
self.complete(ec);
}
}
};
template <class Conn>
struct read_write_check_ping_op {
Conn* cli;
@@ -527,6 +347,8 @@ struct read_write_check_ping_op {
template <class Conn, class Command>
struct run_op {
Conn* cli;
boost::string_view host;
boost::string_view port;
boost::asio::coroutine coro;
template <class Self>
@@ -534,7 +356,8 @@ struct run_op {
{
reenter (coro)
{
yield cli->async_resolve_with_timeout(std::move(self));
cli->write_timer_.expires_after(cli->cfg_.resolve_timeout);
yield cli->async_resolve_with_timeout(host, port, std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -545,6 +368,7 @@ struct run_op {
typename Conn::next_layer_type
>(cli->resv_.get_executor());
cli->write_timer_.expires_after(cli->cfg_.connect_timeout);
yield cli->async_connect_with_timeout(std::move(self));
if (ec) {
self.complete(ec);
@@ -553,6 +377,7 @@ struct run_op {
cli->req_.clear();
cli->req_.push(Command::hello, 3);
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_interval);
yield cli->async_exec_internal(cli->req_, std::move(self));
if (ec) {
cli->close();
@@ -571,51 +396,6 @@ struct run_op {
}
};
template <class Conn>
struct write_op {
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->payload_next_.empty());
// Prepare for the next write.
cli->n_cmds_ = cli->n_cmds_next_;
cli->n_cmds_next_ = 0;
cli->payload_ = cli->payload_next_;
cli->payload_next_.clear();
yield
boost::asio::async_write(
*cli->socket_,
boost::asio::buffer(cli->payload_),
std::move(self));
BOOST_ASSERT(!cli->reqs_.empty());
if (cli->reqs_.front()->n_cmds == 0) {
// Some requests don't have response, so their timers
// won't be canceled on read op, we have to do it here.
cli->reqs_.front()->timer.cancel_one();
// Notice we don't have to call
// cli->wait_read_timer_.cancel_one(); as that operation
// is ongoing.
self.complete({}, n);
return;
}
cli->payload_.clear();
self.complete(ec, n);
}
}
};
template <class Conn>
struct write_with_timeout_op {
Conn* cli;
@@ -630,11 +410,9 @@ struct write_with_timeout_op {
{
reenter (coro)
{
cli->write_timer_.expires_after(cli->cfg_.write_timeout);
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->async_write(token);},
[this](auto token) { return boost::asio::async_write(*cli->socket_, boost::asio::buffer(cli->payload_), token);},
[this](auto token) { return cli->write_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
@@ -678,22 +456,40 @@ struct writer_op {
{
reenter (coro) for (;;)
{
// When cli->cmds_ we are still processing the last request.
// The timer must be however canceled so we can unblock the
// channel.
if (!cli->reqs_.empty()) {
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->payload_next_.empty());
// Prepare for the next write.
cli->n_cmds_ = cli->n_cmds_next_;
cli->n_cmds_next_ = 0;
cli->payload_ = cli->payload_next_;
cli->payload_next_.clear();
cli->write_timer_.expires_after(cli->cfg_.write_timeout);
yield cli->async_write_with_timeout(std::move(self));
if (ec) {
cli->close();
self.complete(ec);
return;
}
cli->payload_.clear();
BOOST_ASSERT(!cli->reqs_.empty());
if (cli->reqs_.front()->n_cmds == 0) {
// Some requests don't have response, so their timers
// won't be canceled on read op, we have to do it here.
cli->reqs_.front()->timer.cancel_one();
// Notice we don't have to call
// cli->wait_read_timer_.cancel_one(); as that
// operation is ongoing.
}
}
yield cli->wait_write_timer_.async_wait(std::move(self));
if (!cli->socket_->is_open()) {
self.complete(error::write_stop_requested);
// The completion has been explicited requested.
self.complete({});
return;
}
}

View File

@@ -4,17 +4,16 @@
* accompanying file LICENSE.txt)
*/
#include <aedis/generic/error.hpp>
#include <aedis/error.hpp>
namespace aedis {
namespace generic {
namespace detail {
struct error_category_impl : boost::system::error_category {
char const* name() const noexcept override
{
return "aedis.generic";
return "aedis";
}
std::string message(int ev) const override
@@ -25,7 +24,6 @@ struct error_category_impl : boost::system::error_category {
case error::read_timeout: return "Read operation timeout.";
case error::write_timeout: return "Write operation timeout.";
case error::idle_timeout: return "Idle timeout.";
case error::write_stop_requested: return "Write stop requested.";
default: BOOST_ASSERT(false);
}
}
@@ -44,5 +42,4 @@ boost::system::error_code make_error_code(error e)
return boost::system::error_code{static_cast<int>(e), detail::category()};
}
} // generic
} // aedis

180
aedis/resp3/exec.hpp Normal file
View File

@@ -0,0 +1,180 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_RESP3_EXEC_HPP
#define AEDIS_RESP3_EXEC_HPP
#include <boost/assert.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/request.hpp>
namespace aedis {
namespace resp3 {
namespace detail {
#include <boost/asio/yield.hpp>
template <
class AsyncStream,
class Command,
class Adapter,
class DynamicBuffer
>
struct exec_op {
AsyncStream* socket;
request<Command> const* req;
Adapter adapter;
DynamicBuffer dbuf;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
yield
boost::asio::async_write(
*socket,
boost::asio::buffer(req->payload()),
std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
yield resp3::async_read(*socket, dbuf, adapter, std::move(self));
self.complete(ec, n);
}
}
};
#include <boost/asio/unyield.hpp>
} // detail
template <
class AsyncStream,
class Command,
class Adapter,
class DynamicBuffer,
class CompletionToken = boost::asio::default_completion_token_t<typename AsyncStream::executor_type>
>
auto async_exec(
AsyncStream& socket,
request<Command> const& req,
Adapter adapter,
DynamicBuffer dbuf,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<AsyncStream, Command, Adapter, DynamicBuffer>
{&socket, &req, adapter, dbuf}, token, socket);
}
namespace detail {
#include <boost/asio/yield.hpp>
template <
class AsyncStream,
class Command,
class Adapter,
class DynamicBuffer
>
struct exec_with_timeout_op {
AsyncStream* socket;
boost::asio::steady_timer* timer;
request<Command> const* req;
Adapter adapter;
DynamicBuffer dbuf;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, std::size_t n = 0
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return resp3::async_exec(*socket, *req, adapter, dbuf, token);},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, 0);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(aedis::error::idle_timeout, 0);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, n);
}
}
};
#include <boost/asio/unyield.hpp>
} // detail
template <
class AsyncStream,
class Command,
class Adapter,
class DynamicBuffer,
class CompletionToken = boost::asio::default_completion_token_t<typename AsyncStream::executor_type>
>
auto async_exec(
AsyncStream& socket,
boost::asio::steady_timer& timer,
request<Command> const& req,
Adapter adapter,
DynamicBuffer dbuf,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_with_timeout_op<AsyncStream, Command, Adapter, DynamicBuffer>
{&socket, &timer, &req, adapter, dbuf}, token, socket, timer);
}
} // resp3
} // aedis
#endif // AEDIS_RESP3_EXEC_HPP

View File

@@ -4,8 +4,8 @@
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_GENERIC_REQUEST_HPP
#define AEDIS_GENERIC_REQUEST_HPP
#ifndef AEDIS_RESP3_REQUEST_HPP
#define AEDIS_RESP3_REQUEST_HPP
#include <boost/hana.hpp>
#include <aedis/resp3/compose.hpp>
@@ -17,7 +17,7 @@
// the value type is a pair.
namespace aedis {
namespace generic {
namespace resp3 {
/** @brief Creates Redis requests from user data.
* \ingroup any
@@ -209,7 +209,7 @@ private:
std::vector<command_info_type> commands_;
};
} // generic
} // resp3
} // aedis
#endif // AEDIS_GENERIC_SERIALIZER_HPP
#endif // AEDIS_RESP3_SERIALIZER_HPP

View File

@@ -4,10 +4,10 @@
* accompanying file LICENSE.txt)
*/
#include <aedis/impl/error.ipp>
#include <aedis/resp3/impl/type.ipp>
#include <aedis/resp3/detail/impl/parser.ipp>
#include <aedis/resp3/impl/error.ipp>
#include <aedis/redis/impl/command.ipp>
#include <aedis/adapter/impl/error.ipp>
#include <aedis/sentinel/impl/command.ipp>
#include <aedis/generic/impl/error.ipp>

View File

@@ -15,8 +15,8 @@
namespace net = boost::asio;
namespace adapter = aedis::adapter;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
using node_type = aedis::resp3::node<boost::string_view>;
using error_code = boost::system::error_code;
@@ -50,7 +50,7 @@ int main()
req.push(command::quit);
db.async_exec(req, adapter, handler);
db.async_run(handler);
db.async_run("127.0.0.1", "6379", handler);
ioc.run();
std::cout

View File

@@ -15,8 +15,8 @@
namespace net = boost::asio;
namespace generic = aedis::generic;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using connection = aedis::generic::connection<command, tcp_socket>;
@@ -139,7 +139,7 @@ int main()
// Redis client and receiver.
auto db = std::make_shared<connection>(ioc);
db->async_run(handler);
db->async_run("127.0.0.1", "6379", handler);
auto sessions = std::make_shared<sessions_type>();
net::co_spawn(ioc, reader(db, sessions), net::detached);

View File

@@ -17,8 +17,8 @@
namespace net = boost::asio;
namespace generic = aedis::generic;
using boost::optional;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
// Response used in this example.
@@ -43,7 +43,6 @@ int main()
req1.push_range(command::rpush, "rpush-key", vec);
req1.push_range(command::sadd, "sadd-key", set);
req1.push_range(command::hset, "hset-key", map);
db.async_exec(req1, generic::adapt(), handler);
// Request that retrieves the containers.
request<command> req2;
@@ -63,9 +62,9 @@ int main()
std::string // quit
> resp;
db.async_exec(req1, generic::adapt(), handler);
db.async_exec(req2, generic::adapt(resp), handler);
db.async_run(handler);
db.async_run("127.0.0.1", "6379", handler);
ioc.run();
auto const& r = std::get<4>(resp);

View File

@@ -12,7 +12,7 @@
namespace net = boost::asio;
namespace generic = aedis::generic;
using generic::request;
using aedis::resp3::request;
using aedis::redis::command;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
@@ -39,7 +39,7 @@ net::awaitable<void> listener()
{
auto ex = co_await net::this_coro::executor;
auto db = std::make_shared<connection>(ex);
db->async_run(net::detached);
db->async_run("127.0.0.1", "6379", net::detached);
tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555});
for (;;)

View File

@@ -13,8 +13,8 @@
namespace net = boost::asio;
namespace generic = aedis::generic;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using connection = generic::connection<command>;
auto handler =[](auto ec, auto...)
@@ -31,7 +31,7 @@ int main()
net::io_context ioc;
connection db{ioc};
db.async_exec(req, generic::adapt(resp), handler);
db.async_run(handler);
db.async_run("127.0.0.1", "6379", handler);
ioc.run();
std::cout << std::get<0>(resp) << std::endl;

View File

@@ -14,8 +14,8 @@
namespace net = boost::asio;
namespace generic = aedis::generic;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using connection = generic::connection<command, tcp_socket>;
using response_type = std::vector<aedis::resp3::node<std::string>>;
@@ -61,6 +61,6 @@ int main()
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
net::co_spawn(ioc, reader(db), net::detached);
db->async_run(handler);
db->async_run("127.0.0.1", "6379", handler);
ioc.run();
}

View File

@@ -22,8 +22,8 @@ namespace net = boost::asio;
namespace resp3 = aedis::resp3;
namespace generic = aedis::generic;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
using error_code = boost::system::error_code;
using net::experimental::as_tuple;
@@ -45,10 +45,8 @@ void test_resolve_error()
};
net::io_context ioc;
connection::config cfg;
cfg.host = "Atibaia";
connection db(ioc, cfg);
db.async_run(f);
connection db(ioc);
db.async_run("Atibaia", "6379", f);
ioc.run();
}
@@ -62,10 +60,8 @@ void test_connect_error()
};
net::io_context ioc;
connection::config cfg;
cfg.port = "1";
connection db(ioc, cfg);
db.async_run(f);
connection db(ioc);
db.async_run("127.0.0.1", "1", f);
ioc.run();
}
@@ -85,7 +81,7 @@ void test_quit()
//expect_eq(r, 152UL);
});
db->async_run([](auto ec){
db->async_run("127.0.0.1", "6379", [](auto ec){
expect_error(ec, net::error::misc_errors::eof);
});
@@ -125,7 +121,7 @@ void test_push()
net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);
db->async_run([](auto ec){
db->async_run("127.0.0.1", "6379", [](auto ec){
expect_error(ec, net::error::misc_errors::eof);
});
@@ -143,7 +139,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
expect_no_error(ec);
});
auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable));
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof);
}
@@ -154,7 +150,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
expect_no_error(ec);
});
auto [ec] = co_await db->async_run(as_tuple(net::use_awaitable));
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof);
}
}
@@ -188,8 +184,8 @@ void test_idle()
expect_no_error(ec);
});
db->async_run([](auto ec){
expect_error(ec, aedis::generic::error::idle_timeout);
db->async_run("127.0.0.1", "6379", [](auto ec){
expect_error(ec, aedis::error::idle_timeout);
});
ioc.run();

View File

@@ -15,8 +15,8 @@
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using aedis::adapter::adapt;
using net::ip::tcp;

View File

@@ -16,8 +16,8 @@
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::resp3::request;
using aedis::redis::command;
using aedis::generic::request;
using aedis::resp3::node;
using aedis::adapter::adapt;
using net::ip::tcp;