2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Reduces the number of rescheduling needed to process a server sent push.

Performance improved by close to 10%.
This commit is contained in:
Marcelo Zimbres
2023-05-21 14:29:41 +02:00
parent f5f57e370b
commit 538ab8f35f
5 changed files with 52 additions and 32 deletions

View File

@@ -79,15 +79,16 @@ add_executable(echo_server_client benchmarks/cpp/asio/echo_server_client.cpp)
add_executable(echo_server_direct benchmarks/cpp/asio/echo_server_direct.cpp)
set(tests_cpp17
test_conn_quit
test_conn_tls
test_low_level
test_conn_exec_retry
test_conn_exec_error
test_request
test_run
test_low_level_sync
test_conn_check_health)
test_conn_quit
test_conn_tls
test_low_level
test_conn_exec_retry
test_conn_exec_error
test_request
test_run
test_low_level_sync
test_conn_check_health
)
set(tests_cpp20
test_conn_exec

View File

@@ -66,7 +66,7 @@
"CMAKE_BUILD_TYPE": "Debug",
"CMAKE_CXX_EXTENSIONS": "OFF",
"CMAKE_CXX_FLAGS": "-Wall -Wextra -fsanitize=address",
"CMAKE_CXX_COMPILER": "g++-11",
"CMAKE_CXX_COMPILER": "clang++-13",
"CMAKE_SHARED_LINKER_FLAGS": "-fsanitize=address",
"CMAKE_CXX_STANDARD_REQUIRED": "ON",
"PROJECT_BINARY_DIR": "${sourceDir}/build/clang++-13",

View File

@@ -43,7 +43,7 @@ namespace detail {
template <class Conn>
struct wait_receive_op {
Conn* conn;
Conn* conn_;
asio::coroutine coro{};
template <class Self>
@@ -52,14 +52,14 @@ struct wait_receive_op {
{
BOOST_ASIO_CORO_REENTER (coro)
{
BOOST_ASIO_CORO_YIELD
conn->channel_.async_send(system::error_code{}, 0, std::move(self));
BOOST_REDIS_CHECK_OP0(;);
conn_->channel_.cancel();
BOOST_ASIO_CORO_YIELD
conn->channel_.async_send(system::error_code{}, 0, std::move(self));
BOOST_REDIS_CHECK_OP0(;);
conn_->channel_.async_send(system::error_code{}, 0, std::move(self));
if (!conn_->is_open() || is_cancelled(self)) {
self.complete(!!ec ? ec : asio::error::operation_aborted);
return;
}
self.complete({});
}
}
@@ -158,7 +158,7 @@ public:
template <class Conn, class Adapter>
struct receive_op {
Conn* conn;
Conn* conn_;
Adapter adapter;
std::size_t read_size = 0;
asio::coroutine coro{};
@@ -171,27 +171,32 @@ struct receive_op {
{
BOOST_ASIO_CORO_REENTER (coro)
{
BOOST_ASIO_CORO_YIELD
conn->channel_.async_receive(std::move(self));
BOOST_REDIS_CHECK_OP1(;);
if (conn_->wait_read_op_notification_) {
BOOST_ASIO_CORO_YIELD
conn_->channel_.async_receive(std::move(self));
if (!conn_->is_open() || is_cancelled(self)) {
self.complete(!!ec ? ec : asio::error::operation_aborted, 0);
return;
}
}
if (conn->use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
if (conn_->use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self));
else
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer().next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), adapter, std::move(self));
if (ec || is_cancelled(self)) {
conn->cancel(operation::run);
conn->cancel(operation::receive);
conn_->cancel(operation::run);
conn_->cancel(operation::receive);
self.complete(!!ec ? ec : asio::error::operation_aborted, {});
return;
}
read_size = n;
BOOST_ASIO_CORO_YIELD
conn->channel_.async_receive(std::move(self));
BOOST_REDIS_CHECK_OP1(;);
conn_->wait_read_op_notification_ = !conn_->is_next_maybe_push();
if (conn_->wait_read_op_notification_)
conn_->channel_.cancel();
self.complete({}, read_size);
return;
@@ -315,6 +320,7 @@ struct run_op {
{
conn->write_buffer_.clear();
conn->read_buffer_.clear();
conn->wait_read_op_notification_ = true;
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
@@ -991,6 +997,11 @@ private:
stream_->next_layer().close();
}
bool is_next_maybe_push() const noexcept
{
return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push);
}
auto is_open() const noexcept { return stream_->next_layer().is_open(); }
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
@@ -1009,6 +1020,9 @@ private:
std::string write_buffer_;
reqs_type reqs_;
std::size_t max_read_size_ = (std::numeric_limits<std::size_t>::max)();
// Flag that optimizes reading pushes.
bool wait_read_op_notification_ = true;
};
} // boost::redis

View File

@@ -9,6 +9,7 @@
#define BOOST_TEST_MODULE check-health
#include <boost/test/included/unit_test.hpp>
#include <iostream>
#include <thread>
#include "common.hpp"
namespace net = boost::asio;
@@ -119,5 +120,9 @@ BOOST_AUTO_TEST_CASE(check_health)
BOOST_TEST(!!res1);
BOOST_TEST(!!res2);
// Waits before exiting otherwise it might cause subsequent tests
// to fail.
std::this_thread::sleep_for(std::chrono::seconds{10});
}

View File

@@ -51,7 +51,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
conn->async_exec(req1, ignore, [&](auto ec, auto){
// Second callback to the called.
std::cout << "req1" << std::endl;
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_TEST(!seen2);
BOOST_TEST(seen3);
seen1 = true;
@@ -60,7 +60,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
conn->async_exec(req2, ignore, [&](auto ec, auto){
// Last callback to the called.
std::cout << "req2" << std::endl;
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_TEST(seen1);
BOOST_TEST(seen3);
seen2 = true;
@@ -71,7 +71,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
conn->async_exec(req3, ignore, [&](auto ec, auto){
// Callback that will be called first.
std::cout << "req3" << std::endl;
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_TEST(!seen1);
BOOST_TEST(!seen2);
seen3 = true;