2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Progresses removing the second async_run overload.

This commit is contained in:
Marcelo Zimbres
2022-10-07 22:43:32 +02:00
parent 4c298ddc6b
commit ba82c6cd84
14 changed files with 148 additions and 87 deletions

View File

@@ -71,6 +71,8 @@ add_executable(test_connection_quit tests/connection_quit.cpp)
add_executable(test_connection_quit_coalesce tests/connection_quit_coalesce.cpp)
add_executable(test_connection_reconnect tests/connection_reconnect.cpp)
add_executable(test_connection_tls tests/connection_tls.cpp)
add_executable(echo_server_client benchmarks/cpp/asio/echo_server_client.cpp)
add_executable(echo_server_direct benchmarks/cpp/asio/echo_server_direct.cpp)
target_compile_features(chat_room PUBLIC cxx_std_20)
target_compile_features(intro PUBLIC cxx_std_17)
@@ -89,6 +91,8 @@ target_compile_features(test_connection_quit PUBLIC cxx_std_17)
target_compile_features(test_connection_quit_coalesce PUBLIC cxx_std_17)
target_compile_features(test_connection_reconnect PUBLIC cxx_std_20)
target_compile_features(test_connection_tls PUBLIC cxx_std_17)
target_compile_features(echo_server_client PUBLIC cxx_std_20)
target_compile_features(echo_server_direct PUBLIC cxx_std_20)
target_link_libraries(intro_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(test_connection_tls OpenSSL::Crypto OpenSSL::SSL)

View File

@@ -24,7 +24,7 @@ with only three library entities
* `aedis::adapt()`: Adapts user data structures like STL containers to
receive Redis responses.
Let us see how this works in more detail.
Let us see how it works in more detail.
### Connection
@@ -702,7 +702,15 @@ The code used in the benchmarks can be found at
## Changelog
### v1.1.0
### master
* Renames `fail_on_connection_lost` to
`aedis::resp3::request::fail_on_connection_lost` and change its
behaviour: Setting it to true won't cause the request to be fail if
`async_exec` is called when there is no ongoing connection, which is
not the role of `aedis::resp3::request::fail_on_connection_lost`.
### v1.1.0/1
* Removes `coalesce_requests` from the `aedis::connection::config`, it
became a request property now, see `aedis::resp3::request::config::coalesce`.

View File

@@ -24,7 +24,7 @@
nodes near coords align={horizontal},
]
\addplot coordinates {
(31.1,Asio)
(29.5,Asio)
(30.7,Tokio)
(35.6,Go)
(43.6,Libuv)

View File

@@ -35,17 +35,17 @@ using stimer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
// to monitor the message traffic.
// Receives messages from other users.
net::awaitable<void> push_receiver(std::shared_ptr<connection> db)
net::awaitable<void> push_receiver(std::shared_ptr<connection> conn)
{
for (std::vector<node<std::string>> resp;;) {
co_await db->async_receive_push(adapt(resp));
co_await conn->async_receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
}
// Subscribes to the channels when a new connection is stablished.
net::awaitable<void> reconnect(std::shared_ptr<connection> db)
net::awaitable<void> reconnect(std::shared_ptr<connection> conn)
{
request req;
req.push("SUBSCRIBE", "chat-channel");
@@ -54,8 +54,8 @@ net::awaitable<void> reconnect(std::shared_ptr<connection> db)
endpoint ep{"127.0.0.1", "6379"};
for (;;) {
boost::system::error_code ec;
co_await db->async_run(ep, req, adapt(), {}, net::redirect_error(net::use_awaitable, ec));
db->reset_stream();
co_await conn->async_run(ep, req, adapt(), {}, net::redirect_error(net::use_awaitable, ec));
conn->reset_stream();
std::cout << ec.message() << std::endl;
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();
@@ -63,13 +63,13 @@ net::awaitable<void> reconnect(std::shared_ptr<connection> db)
}
// Publishes messages to other users.
net::awaitable<void> publisher(stream_descriptor& in, std::shared_ptr<connection> db)
net::awaitable<void> publisher(stream_descriptor& in, std::shared_ptr<connection> conn)
{
for (std::string msg;;) {
auto n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "chat-channel", msg);
co_await db->async_exec(req);
co_await conn->async_exec(req);
msg.erase(0, n);
}
}
@@ -80,10 +80,10 @@ auto main() -> int
net::io_context ioc{1};
stream_descriptor in{ioc, ::dup(STDIN_FILENO)};
auto db = std::make_shared<connection>(ioc);
co_spawn(ioc, publisher(in, db), net::detached);
co_spawn(ioc, push_receiver(db), net::detached);
co_spawn(ioc, reconnect(db), net::detached);
auto conn = std::make_shared<connection>(ioc);
co_spawn(ioc, publisher(in, conn), net::detached);
co_spawn(ioc, push_receiver(conn), net::detached);
co_spawn(ioc, reconnect(conn), net::detached);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });

View File

@@ -19,6 +19,9 @@ using aedis::resp3::request;
using aedis::endpoint;
using connection = aedis::connection<>;
auto logger = [](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
auto main() -> int
{
try {
@@ -48,11 +51,9 @@ auto main() -> int
> resp;
net::io_context ioc;
connection db{ioc};
endpoint ep{"127.0.0.1", "6379"};
db.async_run(ep, req, adapt(resp), {}, [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
connection conn{ioc};
conn.async_exec(req, adapt(resp), logger);
conn.async_run({"127.0.0.1", "6379"}, {}, logger);
ioc.run();
print(std::get<0>(std::get<5>(resp)).value());

View File

@@ -19,21 +19,22 @@ using aedis::resp3::request;
using aedis::endpoint;
using connection = aedis::connection<>;
auto logger = [](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
auto main() -> int
{
try {
net::io_context ioc;
connection db{ioc};
connection conn{ioc};
request req;
req.push("PING");
req.push("QUIT");
std::tuple<std::string, aedis::ignore> resp;
db.async_run({"127.0.0.1", "6379"}, req, adapt(resp), {}, [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
conn.async_exec(req, adapt(resp), logger);
conn.async_run({"127.0.0.1", "6379"}, {}, logger);
ioc.run();
std::cout << std::get<0>(resp) << std::endl;

View File

@@ -83,10 +83,13 @@ bool operator<(user const& a, user const& b)
return std::tie(a.name, a.age, a.country) < std::tie(b.name, b.age, b.country);
}
auto logger = [](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
net::io_context ioc;
connection db{ioc};
connection conn{ioc};
std::set<user> users
{{"Joao", "58", "Brazil"} , {"Serge", "60", "France"}};
@@ -100,10 +103,8 @@ int main()
std::tuple<aedis::ignore, int, std::set<user>, std::string> resp;
endpoint ep{"127.0.0.1", "6379"};
db.async_run(ep, req, adapt(resp), {}, [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
conn.async_exec(req, adapt(resp),logger);
conn.async_run(ep, {}, logger);
ioc.run();
// Print

View File

@@ -10,6 +10,7 @@
#include <tuple>
#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <aedis.hpp>
#include "print.hpp"
@@ -18,6 +19,7 @@
#include <aedis/src.hpp>
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
using aedis::adapt;
using aedis::resp3::request;
using aedis::resp3::node;
@@ -43,10 +45,10 @@ using connection = aedis::connection<tcp_socket>;
*/
// Receives pushes.
net::awaitable<void> push_receiver(std::shared_ptr<connection> db)
net::awaitable<void> push_receiver(std::shared_ptr<connection> conn)
{
for (std::vector<node<std::string>> resp;;) {
co_await db->async_receive_push(adapt(resp));
co_await conn->async_receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
@@ -55,18 +57,24 @@ net::awaitable<void> push_receiver(std::shared_ptr<connection> db)
// See
// - https://redis.io/docs/manual/sentinel.
// - https://redis.io/docs/reference/sentinel-clients.
net::awaitable<void> reconnect(std::shared_ptr<connection> db)
net::awaitable<void> reconnect(std::shared_ptr<connection> conn)
{
request req;
req.get_config().fail_if_not_connected = false;
req.get_config().fail_on_connection_lost = true;
req.push("SUBSCRIBE", "channel");
stimer timer{co_await net::this_coro::executor};
endpoint ep{"127.0.0.1", "6379"};
for (;;) {
boost::system::error_code ec;
co_await db->async_run(ep, req, adapt(), {}, net::redirect_error(net::use_awaitable, ec));
db->reset_stream();
std::cout << ec.message() << std::endl;
boost::system::error_code ec1, ec2;
co_await (
conn->async_run(ep, {}, net::redirect_error(net::use_awaitable, ec1)) &&
conn->async_exec(req, adapt(), net::redirect_error(net::use_awaitable, ec2))
);
conn->reset_stream();
std::clog << "reconnect (async_run): " << ec1.message() << std::endl;
std::clog << "reconnect (async_exec): " << ec2.message() << std::endl;
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();
}
@@ -76,10 +84,10 @@ int main()
{
try {
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
auto conn = std::make_shared<connection>(ioc);
net::co_spawn(ioc, push_receiver(db), net::detached);
net::co_spawn(ioc, reconnect(db), net::detached);
net::co_spawn(ioc, push_receiver(conn), net::detached);
net::co_spawn(ioc, reconnect(conn), net::detached);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });

View File

@@ -51,8 +51,9 @@ public:
, read_timer_{ex}
, push_channel_{ex}
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
, req_{{true}}
{
req_.get_config().fail_if_not_connected = false;
req_.get_config().fail_on_connection_lost = true;
writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
@@ -84,7 +85,7 @@ public:
ping_timer_.cancel();
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !ptr->req->get_config().close_on_connection_lost;
return !ptr->req->get_config().fail_on_connection_lost;
});
// Cancel own pings if there are any waiting.

View File

@@ -229,7 +229,7 @@ struct exec_op {
{
reenter (coro)
{
if (req->get_config().close_on_connection_lost && !conn->is_open()) {
if (req->get_config().fail_if_not_connected && !conn->is_open()) {
// The user doesn't want to wait for the connection to be
// stablished.
self.complete(error::not_connected, 0);

View File

@@ -181,7 +181,7 @@ public:
* called while there is no connection with Redis. The default
* behaviour is not to close requests.
*/
bool close_on_connection_lost = false;
bool fail_on_connection_lost = false;
/** @brief Coalesce this with other requests.
*
@@ -190,13 +190,19 @@ public:
* request will be sent individually.
*/
bool coalesce = true;
/** @brief If set to true, requests started with
* `connection::async_exe` will fail if the called happens
* before the connection with Redis is stablished.
*/
bool fail_if_not_connected = false;
};
/** @brief Constructor
*
* @param cfg Configuration options.
*/
explicit request(config cfg = config{false, true})
explicit request(config cfg = config{false, true, false})
: cfg_{cfg}
{}
@@ -360,8 +366,12 @@ public:
void reserve(std::size_t new_cap = 0)
{ payload_.reserve(new_cap); }
/// Returns a const reference to the config object.
auto get_config() const noexcept -> auto const& {return cfg_; }
/// Returns a reference to the config object.
auto get_config() noexcept -> auto& {return cfg_; }
private:
std::string payload_;
std::size_t commands_ = 0;

View File

@@ -28,7 +28,7 @@ using net::experimental::as_tuple;
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace net::experimental::awaitable_operators;
net::awaitable<void> send_after(std::shared_ptr<connection> db, std::chrono::milliseconds ms)
auto exec_after(std::shared_ptr<connection> conn, std::chrono::milliseconds ms) -> net::awaitable<void>
{
net::steady_timer st{co_await net::this_coro::executor};
st.expires_after(ms);
@@ -37,58 +37,76 @@ net::awaitable<void> send_after(std::shared_ptr<connection> db, std::chrono::mil
request req;
req.push("CLIENT", "PAUSE", ms.count());
auto [ec, n] = co_await db->async_exec(req, adapt(), as_tuple(net::use_awaitable));
auto [ec, n] = co_await conn->async_exec(req, adapt(), as_tuple(net::use_awaitable));
BOOST_TEST(!ec);
}
auto async_test_idle(std::chrono::milliseconds ms) -> net::awaitable<void>
{
connection::timeouts tms;
tms.resolve_timeout = std::chrono::seconds{1};
tms.connect_timeout = std::chrono::seconds{1};
tms.ping_interval = std::chrono::seconds{1};
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
endpoint ep{"127.0.0.1", "6379"};
boost::system::error_code ec;
co_await (
conn->async_run(ep, tms, net::redirect_error(net::use_awaitable, ec)) &&
net::co_spawn(ex, exec_after(conn, ms), net::use_awaitable)
);
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
}
auto async_run_exec(std::chrono::milliseconds ms) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
connection::timeouts ts;
ts.ping_interval = 2 * ms;
ts.resolve_timeout = 2 * ms;
ts.connect_timeout = 2 * ms;
ts.ping_interval = 2 * ms;
ts.resp3_handshake_timeout = 2 * ms;
request req;
req.push("QUIT");
endpoint ep{"127.0.0.1", "6379"};
boost::system::error_code ec1, ec2;
co_await (
conn->async_run(ep, ts, net::redirect_error(net::use_awaitable, ec1)) &&
conn->async_exec(req, adapt(), net::redirect_error(net::use_awaitable, ec2))
);
BOOST_TEST(!ec2);
BOOST_CHECK_EQUAL(ec1, net::error::misc_errors::eof);
}
BOOST_AUTO_TEST_CASE(test_idle)
{
std::cout << boost::unit_test::framework::current_test_case().p_name << std::endl;
std::chrono::milliseconds ms{5000};
{
std::cout << "test_idle" << std::endl;
connection::timeouts cfg;
cfg.resolve_timeout = std::chrono::seconds{1};
cfg.connect_timeout = std::chrono::seconds{1};
cfg.ping_interval = std::chrono::seconds{1};
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
net::co_spawn(ioc.get_executor(), send_after(db, ms), net::detached);
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, cfg, [](auto ec){
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
});
net::co_spawn(ioc.get_executor(), async_test_idle(ms), net::detached);
ioc.run();
}
//----------------------------------------------------------------
// Since we have paused the server above, we have to wait until the
// server is responsive again, so as not to cause other tests to
// fail.
{
// Since we have paused the server above, we have to wait until the
// server is responsive again, so as not to cause other tests to
// fail.
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
connection::timeouts cfg;
cfg.ping_interval = 2 * ms;
cfg.resolve_timeout = 2 * ms;
cfg.connect_timeout = 2 * ms;
cfg.ping_interval = 2 * ms;
cfg.resp3_handshake_timeout = 2 * ms;
request req;
req.push("QUIT");
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, req, adapt(), cfg, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
});
net::co_spawn(ioc.get_executor(), async_run_exec(ms), net::detached);
ioc.run();
}
}
@@ -128,10 +146,12 @@ BOOST_AUTO_TEST_CASE(test_wrong_data_type)
std::tuple<int> resp;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, req, adapt(resp), {}, [](auto ec, auto){
db->async_exec(req, adapt(resp), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, aedis::error::not_a_number);
});
db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
ioc.run();
}
@@ -139,7 +159,8 @@ BOOST_AUTO_TEST_CASE(test_wrong_data_type)
BOOST_AUTO_TEST_CASE(test_not_connected)
{
std::cout << boost::unit_test::framework::current_test_case().p_name << std::endl;
request req{{true}};
request req;
req.get_config().fail_if_not_connected = true;
req.push("PING");
net::io_context ioc;

View File

@@ -34,8 +34,11 @@ void test_missing_push_reader1(bool coalesce)
request req{{false, coalesce}};
req.push("SUBSCRIBE", "channel");
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, req, adapt(), {}, [](auto ec, auto){
db->async_exec(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
});

View File

@@ -23,6 +23,9 @@ using connection = aedis::connection<>;
using error_code = boost::system::error_code;
using operation = aedis::operation;
// TODO: Add quit-cancel tests where we don't send a quit command but
// call async_run && timer.async_wait() and awaitable operators.
// Test if quit causes async_run to exit.
BOOST_AUTO_TEST_CASE(test_quit_no_coalesce)
{