2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Fixes executor usage in connection class.

This commit is contained in:
Marcelo Zimbres
2022-07-16 21:21:13 +02:00
parent ab68e8a31d
commit 76b6106caa
4 changed files with 51 additions and 31 deletions

View File

@@ -69,7 +69,7 @@ public:
* \param ex The executor.
* \param cfg Configuration parameters.
*/
connection(boost::asio::any_io_executor ex, config cfg = config{})
connection(executor_type ex, config cfg = config{})
: resv_{ex}
, ping_timer_{ex}
, check_idle_timer_{ex}
@@ -299,7 +299,7 @@ public:
private:
struct req_info {
req_info(boost::asio::any_io_executor ex) : timer{ex} {}
req_info(executor_type ex) : timer{ex} {}
boost::asio::steady_timer timer;
resp3::request const* req = nullptr;
std::size_t cmds = 0;
@@ -440,15 +440,19 @@ private:
}
}
using channel_type = boost::asio::experimental::channel<void(boost::system::error_code, std::size_t)>;
using channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
using clock_type = std::chrono::steady_clock;
using clock_traits_type = boost::asio::wait_traits<clock_type>;
using timer_type = boost::asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using resolver_type = boost::asio::ip::basic_resolver<boost::asio::ip::tcp, executor_type>;
// IO objects
boost::asio::ip::tcp::resolver resv_;
resolver_type resv_;
std::shared_ptr<AsyncReadWriteStream> socket_;
boost::asio::steady_timer ping_timer_;
boost::asio::steady_timer check_idle_timer_;
boost::asio::steady_timer writer_timer_;
boost::asio::steady_timer read_timer_;
timer_type ping_timer_;
timer_type check_idle_timer_;
timer_type writer_timer_;
timer_type read_timer_;
channel_type push_channel_;
config cfg_;

View File

@@ -18,6 +18,9 @@
namespace aedis {
namespace detail {
template <class Executor>
using conn_timer_t = boost::asio::basic_waitable_timer<std::chrono::steady_clock, boost::asio::wait_traits<std::chrono::steady_clock>, Executor>;
#include <boost/asio/yield.hpp>
template <
@@ -27,7 +30,7 @@ template <
>
struct connect_op {
boost::asio::basic_socket<Protocol, Executor>* socket;
boost::asio::steady_timer* timer;
conn_timer_t<Executor>* timer;
EndpointSequence* endpoints;
boost::asio::coroutine coro;
@@ -77,9 +80,10 @@ struct connect_op {
}
};
template <class Resolver, class Timer>
struct resolve_op {
boost::asio::ip::tcp::resolver* resv;
boost::asio::steady_timer* timer;
Resolver* resv;
Timer* timer;
boost::string_view host;
boost::string_view port;
boost::asio::coroutine coro;
@@ -160,7 +164,7 @@ template <
>
auto async_connect(
boost::asio::basic_socket<Protocol, Executor>& socket,
boost::asio::steady_timer& timer,
conn_timer_t<Executor>& timer,
EndpointSequence ep,
CompletionToken&& token = boost::asio::default_completion_token_t<Executor>{})
{
@@ -172,20 +176,24 @@ auto async_connect(
}
template <
class Resolver,
class Timer,
class CompletionToken =
boost::asio::default_completion_token_t<boost::asio::ip::tcp::resolver::executor_type>
boost::asio::default_completion_token_t<typename Resolver::executor_type>
>
auto async_resolve(
boost::asio::ip::tcp::resolver& resv,
boost::asio::steady_timer& timer,
Resolver& resv,
Timer& timer,
boost::string_view host,
boost::string_view port,
CompletionToken&& token = CompletionToken{})
{
// TODO: Use static_assert to check Resolver::executor_type and
// Timer::executor_type are same.
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, boost::asio::ip::tcp::resolver::results_type)
>(resolve_op{&resv, &timer, host, port}, token, resv, timer);
>(resolve_op<Resolver, Timer>{&resv, &timer, host, port}, token, resv, timer);
}
template <

View File

@@ -19,12 +19,16 @@
namespace net = boost::asio;
namespace this_coro = net::this_coro;
using net::ip::tcp;
using net::awaitable;
using net::co_spawn;
using net::detached;
using net::use_awaitable;
using executor_type = net::io_context::executor_type;
using socket_type = net::basic_stream_socket<net::ip::tcp, executor_type>;
using tcp_socket = net::use_awaitable_t<executor_type>::as_default_on_t<socket_type>;
using acceptor_type = net::basic_socket_acceptor<net::ip::tcp, executor_type>;
using tcp_acceptor = net::use_awaitable_t<executor_type>::as_default_on_t<acceptor_type>;
using awaitable_type = net::awaitable<void, executor_type>;
constexpr net::use_awaitable_t<executor_type> use_awaitable;
awaitable<void> echo(tcp::socket socket)
awaitable_type echo(tcp_socket socket)
{
try {
char data[1024];
@@ -37,20 +41,20 @@ awaitable<void> echo(tcp::socket socket)
}
}
awaitable<void> listener()
awaitable_type listener()
{
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), 55555});
auto ex = co_await this_coro::executor;
tcp_acceptor acceptor(ex, {tcp::v4(), 55555});
for (;;) {
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, echo(std::move(socket)), detached);
tcp_socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(ex, echo(std::move(socket)), detached);
}
}
int main()
{
try {
net::io_context io_context(1);
net::io_context io_context{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO};
co_spawn(io_context, listener(), detached);
io_context.run();
} catch (std::exception const& e) {

View File

@@ -16,11 +16,15 @@
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using executor_type = net::io_context::executor_type;
using socket_type = net::basic_stream_socket<net::ip::tcp, executor_type>;
using tcp_socket = net::use_awaitable_t<executor_type>::as_default_on_t<socket_type>;
using acceptor_type = net::basic_socket_acceptor<net::ip::tcp, executor_type>;
using tcp_acceptor = net::use_awaitable_t<executor_type>::as_default_on_t<acceptor_type>;
using awaitable_type = net::awaitable<void, executor_type>;
using connection = aedis::connection<tcp_socket>;
net::awaitable<void> echo_loop(tcp_socket socket, std::shared_ptr<connection> db)
awaitable_type echo_loop(tcp_socket socket, std::shared_ptr<connection> db)
{
try {
request req;
@@ -41,7 +45,7 @@ net::awaitable<void> echo_loop(tcp_socket socket, std::shared_ptr<connection> db
}
}
net::awaitable<void> listener()
awaitable_type listener()
{
auto ex = co_await net::this_coro::executor;
auto db = std::make_shared<connection>(ex);
@@ -59,7 +63,7 @@ net::awaitable<void> listener()
int main()
{
try {
net::io_context ioc;
net::io_context ioc{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO};
co_spawn(ioc, listener(), net::detached);
ioc.run();
} catch (std::exception const& e) {