2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Adds coalesce option.

This commit is contained in:
Marcelo Zimbres
2022-06-17 22:51:51 +02:00
parent 452589d4e7
commit 5328cdff9a
9 changed files with 59 additions and 48 deletions

View File

@@ -15,7 +15,8 @@ namespace adapter {
template <class T>
using adapter_t = typename detail::adapter_t<T>;
/** \brief Creates a dummy response adapter.
/** \internal
\brief Creates a dummy response adapter.
\ingroup any
The adapter returned by this function ignores responses. It is
@@ -41,7 +42,8 @@ inline
auto adapt() noexcept
{ return detail::response_traits<void>::adapt(); }
/** \brief Adapts user data to read operations.
/** \internal
* \brief Adapts user data to read operations.
* \ingroup any
*
* STL containers, \c std::tuple and built-in types are supported and

View File

@@ -64,6 +64,9 @@ public:
/// The maximum size allwed in a read operation.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
// Whether to coalesce requests.
bool coalesce_requests = true;
};
/** \brief Constructor.
@@ -265,26 +268,24 @@ private:
struct req_info {
boost::asio::steady_timer timer;
resp3::request const* req = nullptr;
std::size_t n_cmds = 0;
bool stop = false;
bool expects_response() const noexcept { return n_cmds != 0;}
void pop() noexcept
{
--n_cmds;
}
void pop() noexcept { --n_cmds; }
};
bool add_request(resp3::request const& req)
{
BOOST_ASSERT(!req.payload().empty());
auto const empty = reqs_.empty();
reqs_.push_back(make_req_info(req.commands().size()));
reqs_.push_back(make_req_info());
reqs_.back()->timer.expires_at(std::chrono::steady_clock::time_point::max());
payload_next_ += req.payload();
for (auto cmd : req.commands())
cmds_next_.push(cmd.first);
reqs_.back()->req = &req;
reqs_.back()->n_cmds = req.commands().size();
reqs_.back()->stop = false;
return empty && socket_ != nullptr;
}
@@ -403,16 +404,12 @@ private:
>(detail::exec_exit_op<connection>{this}, token, resv_);
}
std::shared_ptr<req_info> make_req_info(std::size_t cmds)
std::shared_ptr<req_info> make_req_info()
{
if (pool_.empty())
return std::make_shared<req_info>(
boost::asio::steady_timer{resv_.get_executor()},
cmds, false);
return std::make_shared<req_info>(boost::asio::steady_timer{resv_.get_executor()});
auto ret = pool_.back();
ret->n_cmds = cmds;
ret->stop = false;
pool_.pop_back();
return ret;
}
@@ -422,6 +419,22 @@ private:
pool_.push_back(info);
}
void coalesce_requests()
{
// Coaleces all requests: Copies the request to the variables
// that won't be touched while async_write is suspended.
BOOST_ASSERT(payload_.empty());
BOOST_ASSERT(!reqs_.empty());
auto const size = cfg_.coalesce_requests ? reqs_.size() : 1;
for (auto i = 0UL; i < size; ++i) {
payload_ += reqs_.at(i)->req->payload();
for (auto cmd : reqs_.at(i)->req->commands()) {
cmds_.push(cmd.first);
}
}
}
using channel_type = boost::asio::experimental::channel<void(boost::system::error_code, std::size_t)>;
// IO objects
@@ -440,9 +453,7 @@ private:
std::string read_buffer_;
std::string payload_;
std::string payload_next_;
std::queue<command> cmds_;
std::queue<command> cmds_next_;
std::deque<std::shared_ptr<req_info>> reqs_;
std::vector<std::shared_ptr<req_info>> pool_;

View File

@@ -222,15 +222,7 @@ struct exec_write_op {
{
reenter (coro)
{
BOOST_ASSERT(!cli->payload_next_.empty());
// Copies the request to the variables that won't be touched
// while async_write is suspended.
std::swap(cli->cmds_next_, cli->cmds_);
std::swap(cli->payload_next_, cli->payload_);
cli->cmds_next_ = {};
cli->payload_next_.clear();
cli->coalesce_requests();
cli->write_timer_.expires_after(cli->cfg_.write_timeout);
yield aedis::detail::async_write(*cli->socket_, cli->write_timer_, boost::asio::buffer(cli->payload_), std::move(self));
if (ec) {

View File

@@ -97,8 +97,9 @@ struct add_bulk_impl<boost::hana::tuple<Ts...>> {
} // detail
/** @brief Adds a resp3 header to the request.
* @ingroup any
/** \internal
* \brief Adds a resp3 header to the request.
* \ingroup any
*
* See mystruct.hpp for an example.
*/
@@ -142,8 +143,9 @@ void add_blob(Request& to, boost::string_view blob)
to += separator;
}
/** @brief Adds a separator to the request.
* @ingroup any
/** \internal
* \brief Adds a separator to the request.
* \ingroup any
*
* See mystruct.hpp for an example.
*/

View File

@@ -18,7 +18,8 @@
namespace aedis {
namespace resp3 {
/** \brief Reads a complete response to a command sychronously.
/** \internal
* \brief Reads a complete response to a command sychronously.
* \ingroup any
*
* This function reads a complete response to a command or a
@@ -102,7 +103,8 @@ read(
return consumed;
}
/** \brief Reads a complete response to a command sychronously.
/** \internal
* \brief Reads a complete response to a command sychronously.
* \ingroup any
*
* Same as the error_code overload but throws on error.
@@ -126,7 +128,8 @@ read(
return n;
}
/** @brief Reads a complete response to a Redis command asynchronously.
/** \internal
* \brief Reads a complete response to a Redis command asynchronously.
* \ingroup any
*
* This function reads a complete response to a command or a

View File

@@ -35,10 +35,12 @@ net::awaitable<void> echo_loop(tcp_socket socket, std::shared_ptr<connection> db
}
}
net::awaitable<void> listener()
net::awaitable<void> listener(bool coalesce_requests)
{
auto ex = co_await net::this_coro::executor;
auto db = std::make_shared<connection>(ex);
connection::config cfg;
cfg.coalesce_requests = coalesce_requests;
auto db = std::make_shared<connection>(ex, cfg);
db->async_run("127.0.0.1", "6379", net::detached);
tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555});
@@ -46,11 +48,11 @@ net::awaitable<void> listener()
net::co_spawn(ex, echo_loop(co_await acc.async_accept(), db), net::detached);
}
int main()
int main(int argc, char* argv[])
{
try {
net::io_context ioc;
co_spawn(ioc, listener(), net::detached);
co_spawn(ioc, listener(argc == 1), net::detached);
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -44,9 +44,9 @@ void expect_error(boost::system::error_code a, T expected = {}, std::string cons
}
inline
void expect_no_error(boost::system::error_code ec)
void expect_no_error(boost::system::error_code ec, std::string const& msg)
{
expect_error(ec, boost::system::error_code{});
expect_error(ec, boost::system::error_code{}, msg);
}
template <class T>

View File

@@ -69,7 +69,7 @@ void test_quit()
request req;
req.push(command::quit);
db->async_exec(req, aedis::adapt(), [](auto ec, auto r){
expect_no_error(ec);
expect_no_error(ec, "test_quit");
});
db->async_run("127.0.0.1", "6379", [](auto ec){
@@ -100,7 +100,7 @@ push_consumer1(std::shared_ptr<connection> db)
{
{
auto [ec, n] = co_await db->async_read_push(aedis::adapt(), as_tuple(net::use_awaitable));
expect_no_error(ec);
expect_no_error(ec, "push_consumer");
}
{
@@ -135,7 +135,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
request req;
req.push(command::quit);
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec);
expect_no_error(ec, "run5");
});
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
@@ -146,7 +146,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
request req;
req.push(command::quit);
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec);
expect_no_error(ec, "run5");
});
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
@@ -235,7 +235,7 @@ void test_idle()
req.push(command::client, "PAUSE", 5000);
db->async_exec(req, aedis::adapt(), [](auto ec, auto r){
expect_no_error(ec);
expect_no_error(ec, "test_idle");
});
db->async_run("127.0.0.1", "6379", [](auto ec){
@@ -358,7 +358,6 @@ int main()
test_no_push_reader1();
test_no_push_reader2();
test_no_push_reader3();
// TODO: Reconnect is not working.
//test_reconnect();
test_exec_while_processing();

View File

@@ -31,7 +31,7 @@ example(boost::asio::ip::tcp::endpoint ep, std::string msg, int n)
dbuffer.consume(n);
}
std::printf("Ok: %s", msg.data());
//std::printf("Ok: %s", msg.data());
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}