2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Makes the connection full-duplex.

This commit is contained in:
Marcelo Zimbres
2023-02-12 19:14:07 +01:00
parent b93f36163d
commit 1b60eeb352
4 changed files with 58 additions and 40 deletions

View File

@@ -190,6 +190,8 @@ private:
// Notice this must come before the for-each below.
cancel_push_requests();
// There is small optimization possible here: traverse only the
// partition of unwritten requests instead of them all.
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
if (ptr->is_staged())
ptr->mark_written();
@@ -227,6 +229,9 @@ private:
action_ = action::stop;
}
[[nodiscard]] auto is_waiting_write() const noexcept
{ return !is_written() && !is_staged(); }
[[nodiscard]] auto is_written() const noexcept
{ return status_ == status::written; }
@@ -308,19 +313,24 @@ private:
reqs_.erase(point, std::end(reqs_));
}
[[nodiscard]] bool is_writing() const noexcept
{
return !write_buffer_.empty();
}
void add_request_info(std::shared_ptr<req_info> const& info)
{
reqs_.push_back(info);
if (info->get_request().has_hello_priority()) {
auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
return !e->is_written() && !e->is_staged();
return e->is_waiting_write();
});
std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
}
if (derived().is_open() && !is_waiting_response() && write_buffer_.empty())
if (derived().is_open() && !is_writing())
writer_timer_.cancel();
}
@@ -360,22 +370,29 @@ private:
ri.mark_staged();
}
void coalesce_requests()
[[nodiscard]] bool coalesce_requests()
{
// Coalesce the requests and marks them staged. After a
// Coalesces the requests and marks them staged. After a
// successful write staged requests will be marked as written.
BOOST_ASSERT(write_buffer_.empty());
BOOST_ASSERT(!reqs_.empty());
std::size_t pos = 0;
for (; pos < std::size(reqs_); ++pos)
if (reqs_.at(pos)->is_waiting_write())
break;
stage_request(*reqs_.at(0));
if (pos == std::size(reqs_))
return false;
for (std::size_t i = 1; i < std::size(reqs_); ++i) {
stage_request(*reqs_.at(pos));
for (std::size_t i = pos + 1; i < std::size(reqs_); ++i) {
if (!reqs_.at(i - 1)->get_request().get_config().coalesce ||
!reqs_.at(i - 0)->get_request().get_config().coalesce) {
break;
}
stage_request(*reqs_.at(i));
}
return true;
}
bool is_waiting_response() const noexcept

View File

@@ -235,13 +235,11 @@ EXEC_OP_WAIT:
BOOST_ASSERT(!conn->reqs_.empty());
conn->reqs_.pop_front();
if (!conn->is_waiting_response()) {
conn->read_timer_.cancel_one();
if (!conn->reqs_.empty())
conn->writer_timer_.cancel_one();
} else {
if (conn->is_waiting_response()) {
BOOST_ASSERT(!conn->reqs_.empty());
conn->reqs_.front()->proceed();
} else {
conn->read_timer_.cancel_one();
}
self.complete({}, read_size);
@@ -301,8 +299,7 @@ struct writer_op {
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
while (!conn->reqs_.empty() && !conn->is_waiting_response() && conn->write_buffer_.empty()) {
conn->coalesce_requests();
while (conn->coalesce_requests()) {
BOOST_ASIO_CORO_YIELD
asio::async_write(conn->next_layer(), asio::buffer(conn->write_buffer_), std::move(self));
AEDIS_CHECK_OP0(conn->cancel(operation::run););

View File

@@ -58,19 +58,11 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
req1.push("BLPOP", "any", 3);
// Should not be canceled.
conn->async_exec(req1, gresp, [](auto ec, auto){
bool seen = false;
conn->async_exec(req1, gresp, [&](auto ec, auto) mutable{
std::cout << "async_exec (1): " << ec.message() << std::endl;
BOOST_TEST(!ec);
});
request req2;
req2.get_config().coalesce = false;
req2.push("PING", "second");
// Should be canceled.
conn->async_exec(req2, gresp, [](auto ec, auto){
std::cout << "async_exec (2): " << ec.message() << std::endl;
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted);
seen = true;
});
// Will complete while BLPOP is pending.
@@ -88,6 +80,7 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
co_await conn->async_exec(req3, gresp, net::redirect_error(net::use_awaitable, ec1));
BOOST_TEST(!ec1);
BOOST_TEST(seen);
}
auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>

View File

@@ -43,21 +43,32 @@ BOOST_AUTO_TEST_CASE(test_quit_no_coalesce)
req2.get_config().coalesce = false;
req2.push("QUIT");
conn.async_exec(req1, ignore, [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_exec(req2, ignore, [](auto ec, auto) {
BOOST_TEST(!ec);
});
conn.async_exec(req1, ignore, [](auto ec, auto){
request req3;
req3.get_config().cancel_if_not_connected = true;
req3.push("PING");
auto c3 = [](auto ec, auto)
{
std::cout << "3--> " << ec.message() << std::endl;
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req1, ignore, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req1, ignore, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
};
auto c2 = [&](auto ec, auto)
{
std::cout << "2--> " << ec.message() << std::endl;
BOOST_TEST(!ec);
conn.async_exec(req3, ignore, c3);
};
auto c1 = [&](auto ec, auto)
{
std::cout << "1--> " << ec.message() << std::endl;
BOOST_TEST(!ec);
conn.async_exec(req2, ignore, c2);
};
conn.async_exec(req1, ignore, c1);
conn.async_run([&](auto ec){
BOOST_TEST(!ec);