2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Accepts as valid responses to staged requests.

Before these changes the request had to be marked as written in order to
interpret incoming responses as belonging to that request. On fast
networks however, like on localhost and underload the responses might
arrive before the write operation completed.
This commit is contained in:
Marcelo Zimbres
2023-12-16 20:56:22 +01:00
committed by Marcelo
parent 168ee6148a
commit 3861c5de74
7 changed files with 126 additions and 85 deletions

View File

@@ -676,6 +676,15 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.
## Changelog
### Boost 1.85
* Fixes [issue 170](https://github.com/boostorg/redis/issues/170).
Under load and on low-latency networks it is possible to start
receiving responses before the write operation completed and while
the request is still marked as staged and not written. This messes
up with the heuristics that classifies responses as unsolicied or
not.
### Boost 1.84 (First release in Boost)
* Deprecates the `async_receive` overload that takes a response. Users

View File

@@ -113,7 +113,7 @@ struct exec_op {
asio::coroutine coro{};
template <class Self>
void operator()(Self& self , system::error_code ec = {})
void operator()(Self& self , system::error_code ec = {}, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER (coro)
{
@@ -130,7 +130,6 @@ struct exec_op {
EXEC_OP_WAIT:
BOOST_ASIO_CORO_YIELD
info_->async_wait(std::move(self));
BOOST_ASSERT(ec == asio::error::operation_aborted);
if (info_->ec_) {
self.complete(info_->ec_, 0);
@@ -140,18 +139,18 @@ EXEC_OP_WAIT:
if (info_->stop_requested()) {
// Don't have to call remove_request as it has already
// been by cancel(exec).
return self.complete(ec, 0);
return self.complete(asio::error::operation_aborted, 0);
}
if (is_cancelled(self)) {
if (info_->is_written()) {
if (!info_->is_waiting()) {
using c_t = asio::cancellation_type;
auto const c = self.get_cancellation_state().cancelled();
if ((c & c_t::terminal) != c_t::none) {
// Cancellation requires closing the connection
// otherwise it stays in inconsistent state.
conn_->cancel(operation::run);
return self.complete(ec, 0);
return self.complete(asio::error::operation_aborted, 0);
} else {
// Can't implement other cancelation types, ignoring.
self.get_cancellation_state().clear();
@@ -163,7 +162,7 @@ EXEC_OP_WAIT:
} else {
// Cancelation can be honored.
conn_->remove_request(info_);
self.complete(ec, 0);
self.complete(asio::error::operation_aborted, 0);
return;
}
}
@@ -516,6 +515,7 @@ private:
using runner_type = runner<executor_type>;
using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
using exec_notifier_type = receive_channel_type;
auto use_ssl() const noexcept
{ return runner_.get_config().use_ssl;}
@@ -527,10 +527,10 @@ private:
{
BOOST_ASSERT(ptr != nullptr);
if (ptr->is_written()) {
return !ptr->req_->get_config().cancel_if_unresponded;
} else {
if (ptr->is_waiting()) {
return !ptr->req_->get_config().cancel_on_connection_lost;
} else {
return !ptr->req_->get_config().cancel_if_unresponded;
}
};
@@ -544,7 +544,7 @@ private:
reqs_.erase(point, std::end(reqs_));
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return ptr->reset_status();
return ptr->mark_waiting();
});
return ret;
@@ -555,7 +555,7 @@ private:
auto f = [](auto const& ptr)
{
BOOST_ASSERT(ptr != nullptr);
return ptr->is_written();
return !ptr->is_waiting();
};
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
@@ -615,25 +615,15 @@ private:
using node_type = resp3::basic_node<std::string_view>;
using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
enum class action
{
stop,
proceed,
none,
};
explicit req_info(request const& req, adapter_type adapter, executor_type ex)
: timer_{ex}
, action_{action::none}
: notifier_{ex, 1}
, req_{&req}
, adapter_{}
, expected_responses_{req.get_expected_responses()}
, status_{status::none}
, status_{status::waiting}
, ec_{{}}
, read_size_{0}
{
timer_.expires_at((std::chrono::steady_clock::time_point::max)());
adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
{
auto const i = req_->get_expected_responses() - expected_responses_;
@@ -643,18 +633,16 @@ private:
auto proceed()
{
timer_.cancel();
action_ = action::proceed;
notifier_.try_send(std::error_code{}, 0);
}
void stop()
{
timer_.cancel();
action_ = action::stop;
notifier_.close();
}
[[nodiscard]] auto is_waiting_write() const noexcept
{ return !is_written() && !is_staged(); }
[[nodiscard]] auto is_waiting() const noexcept
{ return status_ == status::waiting; }
[[nodiscard]] auto is_written() const noexcept
{ return status_ == status::written; }
@@ -668,27 +656,26 @@ private:
void mark_staged() noexcept
{ status_ = status::staged; }
void reset_status() noexcept
{ status_ = status::none; }
void mark_waiting() noexcept
{ status_ = status::waiting; }
[[nodiscard]] auto stop_requested() const noexcept
{ return action_ == action::stop;}
{ return !notifier_.is_open();}
template <class CompletionToken>
auto async_wait(CompletionToken token)
{
return timer_.async_wait(std::move(token));
return notifier_.async_receive(std::move(token));
}
//private:
enum class status
{ none
{ waiting
, staged
, written
};
timer_type timer_;
action action_;
exec_notifier_type notifier_;
request const* req_;
wrapped_adapter_type adapter_;
@@ -716,7 +703,7 @@ private:
void cancel_push_requests()
{
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
});
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
@@ -737,7 +724,7 @@ private:
if (info->req_->has_hello_priority()) {
auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
return e->is_waiting_write();
return e->is_waiting();
});
std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
@@ -781,7 +768,7 @@ private:
// Coalesces the requests and marks them staged. After a
// successful write staged requests will be marked as written.
auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
return !ri->is_waiting_write();
return !ri->is_waiting();
});
std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
@@ -798,7 +785,14 @@ private:
bool is_waiting_response() const noexcept
{
return !std::empty(reqs_) && reqs_.front()->is_written();
if (std::empty(reqs_))
return false;
// Under load and on low-latency networks we might start
// receiving responses before the write operation completed and
// the request is still maked as staged and not written. See
// https://github.com/boostorg/redis/issues/170
return !reqs_.front()->is_waiting();
}
void close()
@@ -814,36 +808,39 @@ private:
auto is_next_push()
{
// We handle unsolicited events in the following way
//
// 1. Its resp3 type is a push.
//
// 2. A non-push type is received with an empty requests
// queue. I have noticed this is possible (e.g. -MISCONF).
// I expect them to have type push so we can distinguish
// them from responses to commands, but it is a
// simple-error. If we are lucky enough to receive them
// when the command queue is empty we can treat them as
// server pushes, otherwise it is impossible to handle
// them properly
//
// 3. The request does not expect any response but we got
// one. This may happen if for example, subscribe with
// wrong syntax.
//
// Useful links:
BOOST_ASSERT(!read_buffer_.empty());
// Useful links to understand the heuristics below.
//
// - https://github.com/redis/redis/issues/11784
// - https://github.com/redis/redis/issues/6426
//
// - https://github.com/boostorg/redis/issues/170
BOOST_ASSERT(!read_buffer_.empty());
// The message's resp3 type is a push.
if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
return true;
return
(resp3::to_type(read_buffer_.front()) == resp3::type::push)
|| reqs_.empty()
|| (!reqs_.empty() && reqs_.front()->expected_responses_ == 0)
|| !is_waiting_response(); // Added to deal with MONITOR.
// This is non-push type and the requests queue is empty. I have
// noticed this is possible, for example with -MISCONF. I don't
// know why they are not sent with a push type so we can
// distinguish them from responses to commands. If we are lucky
// enough to receive them when the command queue is empty they
// can be treated as server pushes, otherwise it is impossible
// to handle them properly
if (reqs_.empty())
return true;
// The request does not expect any response but we got one. This
// may happen if for example, subscribe with wrong syntax.
if (reqs_.front()->expected_responses_ == 0)
return true;
// Added to deal with MONITOR and also to fix PR170 which
// happens under load and on low-latency networks, where we
// might start receiving responses before the write operation
// completed and the request is still maked as staged and not
// written.
return reqs_.front()->is_waiting();
}
auto get_suggested_buffer_growth() const noexcept

View File

@@ -47,31 +47,31 @@ class request {
public:
/// Request configuration options.
struct config {
/** \brief If `true`
* `boost::redis::connection::async_exec` will complete with error if the
* connection is lost. Affects only requests that haven't been
* sent yet.
/** \brief If `true` calls to `connection::async_exec` will
* complete with error if the connection is lost while the
* request hasn't been sent yet.
*/
bool cancel_on_connection_lost = true;
/** \brief If `true` the request will complete with
* boost::redis::error::not_connected if `async_exec` is called before
* the connection with Redis was established.
/** \brief If `true` `connection::async_exec` will complete with
* `boost::redis::error::not_connected` if the call happens
* before the connection with Redis was established.
*/
bool cancel_if_not_connected = false;
/** \brief If `false` `boost::redis::connection::async_exec` will not
/** \brief If `false` `connection::async_exec` will not
* automatically cancel this request if the connection is lost.
* Affects only requests that have been written to the socket
* but remained unresponded when `boost::redis::connection::async_run`
* completed.
* but remained unresponded when
* `boost::redis::connection::async_run` completed.
*/
bool cancel_if_unresponded = true;
/** \brief If this request has a `HELLO` command and this flag is
* `true`, the `boost::redis::connection` will move it to the front of
* the queue of awaiting requests. This makes it possible to
* send `HELLO` and authenticate before other commands are sent.
/** \brief If this request has a `HELLO` command and this flag
* is `true`, the `boost::redis::connection` will move it to the
* front of the queue of awaiting requests. This makes it
* possible to send `HELLO` and authenticate before other
* commands are sent.
*/
bool hello_with_priority = true;
};

View File

@@ -6,6 +6,7 @@
#include <boost/redis/connection.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/detached.hpp>
#define BOOST_TEST_MODULE conn-exec
#include <boost/test/included/unit_test.hpp>
#include <iostream>
@@ -17,12 +18,13 @@
// container.
namespace net = boost::asio;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
// Sends three requests where one of them has a hello with a priority
// set, which means it should be executed first.
@@ -153,3 +155,36 @@ BOOST_AUTO_TEST_CASE(correct_database)
BOOST_CHECK_EQUAL(cfg.database_index.value(), index);
}
BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170)
{
// See https://github.com/boostorg/redis/issues/170
std::string payload;
payload.resize(1024);
std::fill(std::begin(payload), std::end(payload), 'A');
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds(0);
conn->async_run(cfg, {}, net::detached);
int counter = 0;
int const repeat = 8000;
for (int i = 0; i < repeat; ++i) {
auto req = std::make_shared<request>();
req->push("PING", payload);
conn->async_exec(*req, ignore, [req, &counter, conn](auto ec, auto) {
BOOST_TEST(!ec);
if (++counter == repeat)
conn->cancel();
});
}
ioc.run();
BOOST_CHECK_EQUAL(counter, repeat);
}

View File

@@ -57,12 +57,12 @@ BOOST_AUTO_TEST_CASE(request_retry_false)
auto c2 = [&](auto ec, auto){
std::cout << "c2" << std::endl;
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted);
};
auto c1 = [&](auto ec, auto){
std::cout << "c1" << std::endl;
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted);
};
auto c0 = [&](auto ec, auto){

View File

@@ -61,7 +61,7 @@ BOOST_AUTO_TEST_CASE(test_async_run_exits)
auto c3 = [](auto ec, auto)
{
std::clog << "c3: " << ec.message() << std::endl;
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
BOOST_CHECK_EQUAL(ec, boost::asio::error::operation_aborted);
};
auto c2 = [&](auto ec, auto)

View File

@@ -99,7 +99,7 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
std::cout << "ccc" << std::endl;
BOOST_CHECK_EQUAL(ec1, boost::system::errc::errc_t::operation_canceled);
BOOST_CHECK_EQUAL(ec1, boost::asio::error::operation_aborted);
}
BOOST_AUTO_TEST_CASE(test_reconnect_and_idle)