2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00
- CI fix.
- Renames request::fail_* to request::cancel_*.
- Adds a second parameter to async_run.
- Adds request::retry flag.
This commit is contained in:
Marcelo Zimbres
2022-10-09 11:40:22 +02:00
parent 4fb2b20954
commit 770e224917
18 changed files with 130 additions and 67 deletions

View File

@@ -706,14 +706,25 @@ The code used in the benchmarks can be found at
### master
* Renames `fail_on_connection_lost` to
`aedis::resp3::request::fail_on_connection_lost` and change its
behaviour: Setting it to true won't cause the request to be fail if
`async_exec` is called when there is no ongoing connection, which is
not the role of `aedis::resp3::request::fail_on_connection_lost`.
`aedis::resp3::request::cancel_on_connection_lost`. Now, it will
only cause connections to be canceled when `async_run` completes.
* Introduces `aedis::resp3::request::cancel_if_not_connected` which will
cause a request to be canceled if `async_exec` is called before a
connection has been stablished.
* Introduces new request flag `aedis::resp3::request::retry` that if
set to true will cause the request to not be canceled when it was
sent to Redis but remained unresponded after `async_run` completed.
It provides a way to avoid executing commands twice.
* Removes the `aedis::connection::async_run` overload that takes
request and adapter as parameters.
* Adds a second parameter to the `aedis::connection::async_run`
completion signature that contains the number of requests that have
been canceled on its completion.
### v1.1.0/1
* Removes `coalesce_requests` from the `aedis::connection::config`, it

View File

@@ -9,6 +9,7 @@
//
#include <cstdio>
#include <iostream>
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -56,7 +56,7 @@ auto main() -> int
net::io_context ioc{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO};
auto db = std::make_shared<connection>(ioc);
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, {}, [&](auto const& ec) {
db->async_run(ep, {}, [&](auto const& ec, auto) {
std::clog << ec.message() << std::endl;
ioc.stop();
});

View File

@@ -41,8 +41,8 @@ auto main() -> int
conn.next_layer().set_verify_callback(verify_certificate);
request req;
req.get_config().fail_if_not_connected = false;
req.get_config().fail_on_connection_lost = true;
req.get_config().cancel_if_not_connected = false;
req.get_config().cancel_on_connection_lost = true;
req.push("PING");
req.push("QUIT");

View File

@@ -60,8 +60,8 @@ net::awaitable<void> push_receiver(std::shared_ptr<connection> conn)
net::awaitable<void> reconnect(std::shared_ptr<connection> conn)
{
request req;
req.get_config().fail_if_not_connected = false;
req.get_config().fail_on_connection_lost = true;
req.get_config().cancel_if_not_connected = false;
req.get_config().cancel_on_connection_lost = true;
req.push("SUBSCRIBE", "channel");
stimer timer{co_await net::this_coro::executor};

View File

@@ -53,8 +53,8 @@ net::awaitable<endpoint> resolve()
};
request req;
req.get_config().fail_if_not_connected = false;
req.get_config().fail_on_connection_lost = true;
req.get_config().cancel_if_not_connected = false;
req.get_config().cancel_on_connection_lost = true;
req.push("SENTINEL", "get-master-addr-by-name", "mymaster");
req.push("QUIT");
@@ -88,8 +88,8 @@ net::awaitable<endpoint> resolve()
net::awaitable<void> reconnect(std::shared_ptr<connection> conn)
{
request req;
req.get_config().fail_if_not_connected = false;
req.get_config().fail_on_connection_lost = true;
req.get_config().cancel_if_not_connected = false;
req.get_config().cancel_on_connection_lost = true;
req.push("SUBSCRIBE", "channel");
auto ex = co_await net::this_coro::executor;

View File

@@ -132,7 +132,7 @@ public:
* The completion token must have the following signature
*
* @code
* void f(boost::system::error_code);
* void f(boost::system::error_code, std::size_t);
* @endcode
*/
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>

View File

@@ -52,8 +52,8 @@ public:
, push_channel_{ex}
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
{
req_.get_config().fail_if_not_connected = false;
req_.get_config().fail_on_connection_lost = true;
req_.get_config().cancel_if_not_connected = false;
req_.get_config().cancel_on_connection_lost = true;
writer_timer_.expires_at(std::chrono::steady_clock::time_point::max());
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
@@ -84,17 +84,6 @@ public:
writer_timer_.cancel();
ping_timer_.cancel();
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !ptr->req->get_config().fail_on_connection_lost;
});
// Cancel own pings if there are any waiting.
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->stop = true;
ptr->timer.cancel();
});
reqs_.erase(point, std::end(reqs_));
return 1U;
}
case operation::receive_push:
@@ -106,6 +95,32 @@ public:
}
}
std::size_t cancel_requests()
{
auto cond = [](auto const& ptr)
{
if (ptr->req->get_config().cancel_on_connection_lost)
return false;
return !(!ptr->req->get_config().retry && ptr->written);
};
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
auto const ret = std::distance(point, std::end(reqs_));
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->stop = true;
ptr->timer.cancel();
});
reqs_.erase(point, std::end(reqs_));
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return ptr->written = false;
});
return ret;
}
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
@@ -143,7 +158,7 @@ public:
ep_ = std::move(ep);
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
, void(boost::system::error_code, std::size_t)
>(detail::run_op<Derived, Timeouts>{&derived(), ts}, token, resv_);
}

View File

@@ -229,7 +229,7 @@ struct exec_op {
{
reenter (coro)
{
if (req->get_config().fail_if_not_connected && !conn->is_open()) {
if (req->get_config().cancel_if_not_connected && !conn->is_open()) {
// The user doesn't want to wait for the connection to be
// stablished.
self.complete(error::not_connected, 0);
@@ -422,7 +422,7 @@ struct run_op {
conn->async_resolve_with_timeout(ts.resolve_timeout, std::move(self));
if (ec) {
conn->cancel(operation::run);
self.complete(ec);
self.complete(ec, conn->cancel_requests());
return;
}
@@ -430,7 +430,7 @@ struct run_op {
conn->derived().async_connect(conn->endpoints_, ts, conn->ping_timer_, std::move(self));
if (ec) {
conn->cancel(operation::run);
self.complete(ec);
self.complete(ec, conn->cancel_requests());
return;
}
@@ -449,7 +449,7 @@ struct run_op {
if (ec) {
conn->cancel(operation::run);
self.complete(ec);
self.complete(ec, conn->cancel_requests());
return;
}
@@ -457,18 +457,16 @@ struct run_op {
if (!conn->expect_role(conn->ep_.role)) {
conn->cancel(operation::run);
self.complete(error::unexpected_server_role);
self.complete(error::unexpected_server_role, conn->cancel_requests());
return;
}
conn->write_buffer_.clear();
conn->cmds_ = 0;
std::for_each(std::begin(conn->reqs_), std::end(conn->reqs_), [](auto const& ptr) {
return ptr->written = false;
});
yield conn->async_start(ts, std::move(self));
self.complete(ec);
self.complete(ec, conn->cancel_requests());
}
}
};

View File

@@ -148,8 +148,8 @@ void add_separator(Request& to)
}
} // detail
/** @brief Creates Redis requests.
* @ingroup high-level-api
/** \brief Creates Redis requests.
* \ingroup high-level-api
*
* A request is composed of one or more Redis commands and is
* referred to in the redis documentation as a pipeline, see
@@ -165,25 +165,25 @@ void add_separator(Request& to)
* co_await async_write(socket, buffer(r));
* @endcode
*
* @remarks
* \remarks
*
* @li Non-string types will be converted to string by using \c
* \li Non-string types will be converted to string by using \c
* to_bulk, which must be made available over ADL.
* @li Uses std::string as internal storage.
* \li Uses std::string as internal storage.
*/
class request {
public:
/// Request configuration options.
struct config {
/** @brief If set to true, requests started with
/** \brief If set to true, requests started with
* `connection::async_exe` will fail either if the connection is
* lost while the request is pending or if `async_exec` is
* called while there is no connection with Redis. The default
* behaviour is not to close requests.
*/
bool fail_on_connection_lost = false;
bool cancel_on_connection_lost = false;
/** @brief Coalesce this with other requests.
/** \brief Coalesce this with other requests.
*
* If true this request will be coalesced with other requests,
* see https://redis.io/topics/pipelining. If false, this
@@ -191,20 +191,25 @@ public:
*/
bool coalesce = true;
/** @brief If set to true, requests started with
/** \brief If set to true, requests started with
* `connection::async_exe` will fail if the called happens
* before the connection with Redis is stablished.
*/
bool fail_if_not_connected = false;
bool cancel_if_not_connected = false;
// TODO: Add retry flag.
/** \brief If true, the implementation will resend this
* request if it remained unresponded when
* `aedis::connection::async_run` completed. Has effect only if
* cancel_on_connection_lost is false.
*/
bool retry = true;
};
/** @brief Constructor
*
* @param cfg Configuration options.
*/
explicit request(config cfg = config{false, true, false})
explicit request(config cfg = config{false, true, false, true})
: cfg_{cfg}
{}

View File

@@ -6,6 +6,7 @@
#include <iostream>
#include <boost/asio.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#include <boost/system/errc.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
@@ -24,7 +25,6 @@ using endpoint = aedis::endpoint;
using error_code = boost::system::error_code;
using net::experimental::as_tuple;
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace net::experimental::awaitable_operators;
@@ -53,4 +53,6 @@ BOOST_AUTO_TEST_CASE(test_cancel_run)
net::co_spawn(ioc.get_executor(), async_test_cancel_run(), net::detached);
ioc.run();
}
#else
int main(){}
#endif

View File

@@ -32,7 +32,7 @@ error_code test_async_run(endpoint ep, connection::timeouts cfg = {})
net::io_context ioc;
connection db{ioc};
error_code ret;
db.async_run(ep, cfg, [&](auto ec) { ret = ec; });
db.async_run(ep, cfg, [&](auto ec, auto) { ret = ec; });
ioc.run();
return ret;
}

View File

@@ -149,7 +149,7 @@ BOOST_AUTO_TEST_CASE(test_wrong_data_type)
db->async_exec(req, adapt(resp), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, aedis::error::not_a_number);
});
db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){
db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
@@ -160,7 +160,7 @@ BOOST_AUTO_TEST_CASE(test_not_connected)
{
std::cout << boost::unit_test::framework::current_test_case().p_name << std::endl;
request req;
req.get_config().fail_if_not_connected = true;
req.get_config().cancel_if_not_connected = true;
req.push("PING");
net::io_context ioc;
@@ -171,3 +171,34 @@ BOOST_AUTO_TEST_CASE(test_not_connected)
ioc.run();
}
BOOST_AUTO_TEST_CASE(test_retry)
{
std::cout << boost::unit_test::framework::current_test_case().p_name << std::endl;
request req1;
req1.get_config().cancel_on_connection_lost = true;
req1.push("CLIENT", "PAUSE", 7000);
request req2;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().retry = false;
req2.push("PING");
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->async_exec(req1, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
db->async_exec(req2, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
db->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
});
ioc.run();
}

View File

@@ -38,7 +38,7 @@ void test_missing_push_reader1(bool coalesce)
BOOST_TEST(!ec);
});
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
});
@@ -57,7 +57,7 @@ void test_missing_push_reader2(bool coalesce)
BOOST_TEST(!ec);
});
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
});
@@ -77,7 +77,7 @@ void test_missing_push_reader3(bool coalesce)
BOOST_TEST(!ec);
});
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
});
@@ -135,7 +135,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::error::channel_errors::channel_cancelled);
});
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec){
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
@@ -158,7 +158,7 @@ void test_push_is_received1(bool coalesce)
BOOST_TEST(!ec);
});
conn->async_run({"127.0.0.1", "6379"}, {}, [conn](auto ec){
conn->async_run({"127.0.0.1", "6379"}, {}, [conn](auto ec, auto){
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
conn->cancel(operation::receive_push);
});

View File

@@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE(test_quit_no_coalesce)
});
endpoint ep{"127.0.0.1", "6379"};
conn->async_run(ep, {}, [conn](auto ec){
conn->async_run(ep, {}, [conn](auto ec, auto){
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
conn->cancel(operation::exec);
});
@@ -74,7 +74,7 @@ void test_quit2(bool coalesce)
BOOST_TEST(!ec);
});
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec) {
conn->async_run({"127.0.0.1", "6379"}, {}, [](auto ec, auto) {
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
});

View File

@@ -48,7 +48,7 @@ BOOST_AUTO_TEST_CASE(test_quit_coalesce)
});
endpoint ep{"127.0.0.1", "6379"};
db->async_run(ep, {}, [db](auto ec){
db->async_run(ep, {}, [db](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
db->cancel(operation::exec);
});

View File

@@ -6,6 +6,7 @@
#include <iostream>
#include <boost/asio.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
@@ -21,7 +22,6 @@ using connection = aedis::connection<>;
using endpoint = aedis::endpoint;
using error_code = boost::system::error_code;
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace boost::asio::experimental::awaitable_operators;
@@ -65,8 +65,8 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
boost::system::error_code ec1, ec2;
request req1;
req1.get_config().fail_if_not_connected = false;
req1.get_config().fail_on_connection_lost = true;
req1.get_config().cancel_if_not_connected = false;
req1.get_config().cancel_on_connection_lost = true;
req1.push("CLIENT", "PAUSE", 7000);
co_await (
@@ -78,8 +78,8 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
BOOST_CHECK_EQUAL(ec2, aedis::error::idle_timeout);
request req2;
req2.get_config().fail_if_not_connected = false;
req2.get_config().fail_on_connection_lost = true;
req2.get_config().cancel_if_not_connected = false;
req2.get_config().cancel_on_connection_lost = true;
req2.push("QUIT");
co_await (

View File

@@ -36,7 +36,7 @@ boost::system::error_code hello_fail(endpoint ep)
conn->next_layer().set_verify_mode(net::ssl::verify_peer);
conn->next_layer().set_verify_callback(verify_certificate);
boost::system::error_code ret;
conn->async_run(ep, {}, [&](auto ec) {
conn->async_run(ep, {}, [&](auto ec, auto) {
ret = ec;
});