2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Lots of improvements.

This commit is contained in:
Marcelo Zimbres
2022-02-10 22:04:37 +01:00
parent 317690ee91
commit b04cc12d64
9 changed files with 242 additions and 2895 deletions

View File

@@ -20,7 +20,6 @@ check_PROGRAMS += lists
check_PROGRAMS += key_expiration
check_PROGRAMS += response_adapter
check_PROGRAMS += sync
check_PROGRAMS += multipurpose_client
check_PROGRAMS += test_offline
check_PROGRAMS += test_online
check_PROGRAMS += transaction
@@ -46,7 +45,6 @@ lists_SOURCES = $(top_srcdir)/examples/lists.cpp
key_expiration_SOURCES = $(top_srcdir)/examples/key_expiration.cpp
response_adapter_SOURCES = $(top_srcdir)/examples/response_adapter.cpp
sync_SOURCES = $(top_srcdir)/examples/sync.cpp
multipurpose_client_SOURCES = $(top_srcdir)/examples/multipurpose_client.cpp
commands_SOURCES = $(top_srcdir)/tools/commands.cpp
test_offline_SOURCES = $(top_srcdir)/tests/offline.cpp
test_online_SOURCES = $(top_srcdir)/tests/online.cpp

View File

@@ -82,10 +82,6 @@
servers that interact with users and Redis asynchronously over
long lasting connections using a higher level API.
- multipurpose_client.cpp: Shows how to use and experimental high
level redis client that keeps a long lasting connections to a
redis server. This is the starting point for the next examples.
- echo_server.cpp: Shows the basic principles behind asynchronous
communication with a database in an asynchronous server. In this
case, the server is a proxy between the user and Redis.

View File

@@ -12,11 +12,32 @@
#include <aedis/aedis.hpp>
#include <aedis/redis/command.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/adapt.hpp>
#include <boost/asio/async_result.hpp>
namespace aedis {
namespace resp3 {
namespace redis {
namespace experimental {
inline
auto adapt()
{
return [](command, resp3::type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec) { };
}
template <class T>
auto adapt(T& t)
{
return [adapter = resp3::adapt(t)](command, resp3::type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec) mutable
{ return adapter(t, aggregate_size, depth, data, size, ec); };
}
struct extended_ignore_adapter {
void operator()(redis::command, resp3::type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec) {}
};
/** \brief A high level redis client.
* \ingroup any
*
@@ -30,21 +51,14 @@ namespace experimental {
*/
class client : public std::enable_shared_from_this<client> {
public:
/** \brief The extended response adapter type.
*
* The difference between the adapter and extended_adapter
* concepts is that the extended get a command redis::parameter.
*/
using extented_adapter_type = std::function<void(redis::command, type, std::size_t, std::size_t, char const*, std::size_t, std::error_code&)>;
/// The type of the message callback.
using on_message_type = std::function<void(std::error_code ec, redis::command)>;
/// The type of the socket used by the client.
//using socket_type = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using socket_type = net::ip::tcp::socket;
private:
template <class T>
friend struct read_op;
struct request_info {
// Request size in bytes.
std::size_t size = 0;
@@ -54,6 +68,9 @@ private:
std::size_t cmds = 0;
};
// Buffer used in the read operations.
std::string read_buffer_;
// Requests payload.
std::string requests_;
@@ -70,23 +87,9 @@ private:
// next message in the output queue.
net::steady_timer timer_;
// Response adapter.
extented_adapter_type extended_adapter_ = [](redis::command, type, std::size_t, std::size_t, char const*, std::size_t, std::error_code&) {};
// Message callback.
on_message_type on_msg_ = [](std::error_code ec, redis::command) {};
// Set when the writer coroutine should stop.
bool stop_writer_ = false;
// A coroutine that keeps reading the socket. When a message
// arrives it calls on_message.
net::awaitable<void> reader();
// Write coroutine. It is kept suspended until there are messages
// to be sent.
net::awaitable<void> writer();
/* Prepares the back of the queue to receive further commands.
*
* If true is returned the request in the front of the queue can be
@@ -106,18 +109,19 @@ public:
/// Returns the executor used for I/O with Redis.
auto get_executor() {return socket_.get_executor();}
/** \brief Starts communication with Redis.
*
* This functions will send the hello command to Redis and spawn
* the read and write coroutines.
*
* \param socket A socket that is connected to redis.
*
* \returns This function returns an awaitable on which users should \c
* co_await. When the communication with Redis is lost the
* coroutine will finally co_return.
*/
net::awaitable<void> engage(socket_type socket);
void set_stream(socket_type socket)
{
socket_ = std::move(socket);
net::co_spawn(
socket_.get_executor(),
[self = shared_from_this()] { return self->writer(); },
net::detached);
}
// Write coroutine. It is kept suspended until there are messages
// to be sent.
net::awaitable<void> writer();
/** \brief Adds a command to the command queue.
*
@@ -126,11 +130,23 @@ public:
template <class... Ts>
void send(redis::command cmd, Ts const&... args);
/// Sets an extended response adapter.
void set_extended_adapter(extented_adapter_type adapter);
// Reads messages asynchronously.
template <
class ExtendedAdapter = extended_ignore_adapter,
class CompletionToken = net::use_awaitable_t<>
>
auto
async_read(
ExtendedAdapter extended_adapter = extended_ignore_adapter{},
CompletionToken&& token = net::use_awaitable_t<>{});
/// Sets the message callback;
void set_msg_callback(on_message_type on_msg);
// TODO: can we use cancellation.
void stop_writer()
{
stop_writer_ = true;
timer_.cancel();
socket_.close();
}
};
template <class... Ts>
@@ -153,6 +169,113 @@ void client::send(redis::command cmd, Ts const&... args)
timer_.cancel_one();
}
#include <boost/asio/yield.hpp>
template <class ExtendedAdapter>
struct read_op {
client* cli;
ExtendedAdapter adapter;
net::coroutine coro;
resp3::type t = resp3::type::invalid;
redis::command cmd = redis::command::unknown;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro) {
boost::ignore_unused(n);
if (ec) {
cli->stop_writer(); // TODO: implement as cancellation.
// TODO: close the socket?
self.complete(ec, redis::command::unknown);
return;
}
if (std::empty(cli->read_buffer_)) {
yield
net::async_read_until(
cli->socket_,
net::dynamic_buffer(cli->read_buffer_),
"\r\n",
std::move(self));
if (ec) {
cli->stop_writer();
// TODO: close the socket?
self.complete(ec, redis::command::unknown);
return;
}
}
assert(!std::empty(cli->read_buffer_));
t = resp3::detail::to_type(cli->read_buffer_.front());
if (t != resp3::type::push) {
assert(!std::empty(cli->commands_));
cmd = cli->commands_.front();
}
yield
resp3::async_read(
cli->socket_,
net::dynamic_buffer(cli->read_buffer_),
[a = adapter, c = cmd](resp3::type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec) mutable {a(c, t, aggregate_size, depth, data, size, ec);},
std::move(self));
if (ec) {
cli->stop_writer();
// TODO: close the socket?
self.complete(ec, redis::command::unknown);
return;
}
if (t != resp3::type::push) {
assert(!std::empty(cli->req_info_));
cli->commands_.pop();
if (--cli->req_info_.front().cmds == 0) {
cli->req_info_.pop();
if (!std::empty(cli->req_info_)) {
assert(!std::empty(cli->requests_));
yield
net::async_write(
cli->socket_,
net::buffer(cli->requests_.data(), cli->req_info_.front().size),
std::move(self));
if (ec) {
cli->stop_writer();
// TODO: close the socket?
self.complete(ec, redis::command::unknown);
return;
}
cli->requests_.erase(0, cli->req_info_.front().size);
cli->req_info_.front().size = 0;
if (cli->req_info_.front().cmds == 0)
cli->req_info_.pop();
}
}
}
self.complete({}, cmd);
}
}
};
#include <boost/asio/unyield.hpp>
template <class ExtendedAdapter, class CompletionToken>
auto client::async_read(ExtendedAdapter adapter, CompletionToken&& token)
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code, redis::command)
>(read_op<ExtendedAdapter>{this, adapter}, token, socket_);
}
} // experimental
} // resp3
} // redis
} // aedis

View File

@@ -8,11 +8,10 @@
#pragma once
#include <aedis/redis/experimental/client.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <aedis/resp3/detail/parser.hpp>
namespace aedis {
namespace resp3 {
namespace redis {
namespace experimental {
client::client(net::any_io_executor ex)
@@ -22,96 +21,14 @@ client::client(net::any_io_executor ex)
timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
net::awaitable<void> client::reader()
{
// Writes and reads continuosly from the socket.
for (std::string buffer;;) {
// Notice this coro can get scheduled while the write operation
// in the writer is ongoing. so we have to check.
while (!std::empty(req_info_) && req_info_.front().size != 0) {
assert(!std::empty(requests_));
boost::system::error_code ec;
co_await
net::async_write(
socket_,
net::buffer(requests_.data(), req_info_.front().size),
net::redirect_error(net::use_awaitable, ec));
requests_.erase(0, req_info_.front().size);
req_info_.front().size = 0;
if (req_info_.front().cmds != 0)
break; // We must await the responses.
req_info_.pop();
}
do { // Keeps reading while there are no messages queued waiting to be sent.
do { // Consumes the responses to all commands in the request.
boost::system::error_code ec;
auto const t =
co_await async_read_type(socket_, net::dynamic_buffer(buffer),
net::redirect_error(net::use_awaitable, ec));
if (ec) {
stop_writer_ = true;
timer_.cancel();
co_return;
}
if (t == type::push) {
auto adapter = [this](type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec)
{extended_adapter_(redis::command::unknown, t, aggregate_size, depth, data, size, ec);};
co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapter, net::redirect_error(net::use_awaitable, ec));
on_msg_(ec, redis::command::unknown);
if (ec) { // TODO: Return only on non aedis errors.
stop_writer_ = true;
timer_.cancel();
co_return;
}
} else {
auto adapter = [this](type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec)
{extended_adapter_(commands_.front(), t, aggregate_size, depth, data, size, ec);};
boost::system::error_code ec;
co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapter, net::redirect_error(net::use_awaitable, ec));
on_msg_(ec, commands_.front());
if (ec) { // TODO: Return only on non aedis errors.
stop_writer_ = true;
timer_.cancel();
co_return;
}
commands_.pop();
--req_info_.front().cmds;
}
} while (!std::empty(req_info_) && req_info_.front().cmds != 0);
// We may exit the loop above either because we are done
// with the response or because we received a server push
// while the queue was empty so we have to check before
// poping..
if (!std::empty(req_info_))
req_info_.pop();
} while (std::empty(req_info_));
}
}
net::awaitable<void> client::writer()
{
boost::system::error_code ec;
while (socket_.is_open()) {
ec = {};
co_await timer_.async_wait(net::redirect_error(net::use_awaitable, ec));
if (stop_writer_)
co_return;
// Notice this coro can get scheduled while the write operation
// in the reader is ongoing. so we have to check.
while (!std::empty(req_info_) && req_info_.front().size != 0) {
// TODO: Limit the size of the pipelines by spliting that last
// one in two if needed.
if (!std::empty(req_info_)) {
assert(req_info_.front().size != 0);
assert(!std::empty(requests_));
ec = {};
co_await net::async_write(
@@ -128,11 +45,14 @@ net::awaitable<void> client::writer()
requests_.erase(0, req_info_.front().size);
req_info_.front().size = 0;
if (req_info_.front().cmds != 0)
break;
req_info_.pop();
if (req_info_.front().cmds == 0)
req_info_.pop();
}
ec = {};
co_await timer_.async_wait(net::redirect_error(net::use_awaitable, ec));
if (stop_writer_)
co_return;
}
}
@@ -153,48 +73,7 @@ bool client::prepare_next()
return false;
}
void client::set_extended_adapter(extented_adapter_type adapter)
{
extended_adapter_ = adapter;
}
void client::set_msg_callback(on_message_type on_msg)
{
on_msg_ = on_msg;
}
net::awaitable<void> client::engage(socket_type socket)
{
using namespace aedis::net::experimental::awaitable_operators;
socket_ = std::move(socket);
std::string request;
auto sr = redis::make_serializer(request);
sr.push(redis::command::hello, 3);
boost::system::error_code ec;
co_await net::async_write(socket_, net::buffer(request), net::redirect_error(net::use_awaitable, ec));
if (ec)
co_return;
std::string buffer;
auto adapter = [this](type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec)
{extended_adapter_(redis::command::hello, t, aggregate_size, depth, data, size, ec);};
co_await
resp3::async_read(
socket_, net::dynamic_buffer(buffer), adapter, net::redirect_error(net::use_awaitable, ec));
on_msg_(ec, redis::command::hello);
if (ec)
co_return;
co_await (reader() && writer());
}
} // experimental
} // resp3
} // redis
} // aedis

File diff suppressed because it is too large Load Diff

View File

@@ -16,8 +16,8 @@
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::resp3::adapt;
using aedis::resp3::experimental::client;
using aedis::resp3::node;
using aedis::resp3::type;
using aedis::user_session;
@@ -31,13 +31,8 @@ private:
std::vector<std::weak_ptr<user_session_base>> sessions_;
public:
void on_message(std::error_code ec, command cmd)
void on_message(command cmd)
{
if (ec) {
std::cerr << "Error: " << ec.message() << std::endl;
return;
}
switch (cmd) {
case command::incr:
{
@@ -70,12 +65,20 @@ public:
{ sessions_.push_back(session); }
};
net::awaitable<void> connection_manager(std::shared_ptr<client> db)
net::awaitable<void> run(std::shared_ptr<client> db, std::shared_ptr<receiver> recv)
{
try {
auto socket = co_await connect();
co_await db->engage(std::move(socket));
db->set_stream(co_await connect());
db->send(command::hello, 3);
db->send(command::subscribe, "channel");
for (auto adapter = recv->get_extended_adapter();;) {
auto const cmd = co_await db->async_read(adapter, net::use_awaitable);
recv->on_message(cmd);
}
} catch (std::exception const& e) {
db->stop_writer();
std::cerr << "Error: " << e.what() << std::endl;
}
}
@@ -86,14 +89,9 @@ net::awaitable<void> listener()
net::ip::tcp::acceptor acceptor(ex, {net::ip::tcp::v4(), 55555});
auto recv = std::make_shared<receiver>();
auto on_db_msg = [recv](std::error_code ec, command cmd)
{ recv->on_message(ec, cmd); };
auto db = std::make_shared<client>(ex);
db->set_extended_adapter(recv->get_extended_adapter());
db->set_msg_callback(on_db_msg);
net::co_spawn(ex, connection_manager(db), net::detached);
db->send(command::subscribe, "channel");
net::co_spawn(ex, run(db, recv), net::detached);
auto on_user_msg = [db](std::string const& msg)
{

View File

@@ -17,11 +17,11 @@
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::user_session;
using aedis::user_session_base;
using aedis::resp3::node;
using aedis::resp3::adapt;
using aedis::resp3::experimental::client;
using aedis::resp3::type;
class receiver : public std::enable_shared_from_this<receiver> {
@@ -30,13 +30,8 @@ private:
std::queue<std::weak_ptr<user_session_base>> sessions_;
public:
void on_message(std::error_code ec, command cmd)
void on_message(command cmd)
{
if (ec) {
std::cerr << "Error: " << ec.message() << std::endl;
return;
}
switch (cmd) {
case command::ping:
{
@@ -68,12 +63,19 @@ public:
{ sessions_.push(session); }
};
net::awaitable<void> connection_manager(std::shared_ptr<client> db)
net::awaitable<void> run(std::shared_ptr<client> db, std::shared_ptr<receiver> recv)
{
try {
auto socket = co_await connect();
co_await db->engage(std::move(socket));
db->set_stream(co_await connect());
db->send(command::hello, 3);
for (auto adapter = recv->get_extended_adapter();;) {
auto const cmd = co_await db->async_read(adapter, net::use_awaitable);
recv->on_message(cmd);
}
} catch (std::exception const& e) {
db->stop_writer();
std::cerr << "Error: " << e.what() << std::endl;
}
}
@@ -84,13 +86,9 @@ net::awaitable<void> listener()
net::ip::tcp::acceptor acceptor(ex, {net::ip::tcp::v4(), 55555});
auto recv = std::make_shared<receiver>();
auto on_db_msg = [recv](std::error_code ec, command cmd)
{ recv->on_message(ec, cmd); };
auto db = std::make_shared<client>(ex);
db->set_extended_adapter(recv->get_extended_adapter());
db->set_msg_callback(on_db_msg);
net::co_spawn(ex, connection_manager(db), net::detached);
net::co_spawn(ex, run(db, recv), net::detached);
for (;;) {
auto socket = co_await acceptor.async_accept(net::use_awaitable);

View File

@@ -6,63 +6,52 @@
*/
#include <iostream>
#include <memory>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace resp3 = aedis::resp3;
using aedis::redis::command;
using aedis::redis::make_serializer;
using resp3::adapt;
namespace net = aedis::net;
using net::async_write;
using net::buffer;
using net::dynamic_buffer;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::redis::experimental::adapt;
net::awaitable<void> ping()
net::awaitable<void> run()
{
auto db = std::make_shared<client>(co_await net::this_coro::executor);
db->set_stream(co_await connect());
db->send(command::hello, 3);
db->send(command::ping, "O rato roeu a roupa do rei de Roma");
db->send(command::incr, "redis-client-counter");
db->send(command::quit);
std::string ping;
int incr;
co_await db->async_read();
co_await db->async_read(adapt(ping));
co_await db->async_read(adapt(incr));
co_await db->async_read();
boost::system::error_code ec;
co_await db->async_read(adapt(), net::redirect_error(net::use_awaitable, ec));
std::cout
<< "ping: " << ping << "\n"
<< "incr: " << incr << "\n";
}
int main()
{
try {
auto socket = co_await connect(); // See lib/net_utils.hpp
// Creates and sends the request.
std::string request;
auto sr = make_serializer(request);
sr.push(command::hello, 3);
sr.push(command::flushall);
sr.push(command::ping);
sr.push(command::incr, "key");
sr.push(command::quit);
co_await async_write(socket, buffer(request));
// Responses we are interested in.
int incr;
std::string ping;
// Reads the responses to ping and incr, ignore the others.
std::string buffer;
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // hello
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // flushall
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(ping));
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(incr));
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // quit
// Print the responses.
std::cout
<< "ping: " << ping << "\n"
<< "incr: " << incr << "\n";
net::io_context ioc{1};
net::co_spawn(ioc, run(), net::detached);
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
exit(EXIT_FAILURE);
}
}
int main()
{
net::io_context ioc;
co_spawn(ioc, ping(), net::detached);
ioc.run();
}

View File

@@ -1,69 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <iostream>
#include <memory>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace net = aedis::net;
using aedis::redis::command;
using aedis::resp3::experimental::client;
using aedis::resp3::node;
using aedis::resp3::type;
net::awaitable<void> connection_manager(std::shared_ptr<client> db)
{
try {
auto socket = co_await connect();
co_await db->engage(std::move(socket));
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
}
int main()
{
try {
std::vector<node> resps;
auto on_msg = [&resps](std::error_code ec, command cmd)
{
if (ec) {
std::cerr << "Error: " << ec.message() << std::endl;
return;
}
std::cout << cmd << ":: " << resps.front().data << std::endl;
resps.clear();
};
net::io_context ioc{1};
// This adapter uses the general response that is suitable for
// all commands, so the command parameter will be ignored.
auto ext_adapter = [adapter = adapt(resps)](command, type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec) mutable
{ return adapter(t, aggregate_size, depth, data, size, ec); };
auto db = std::make_shared<client>(ioc.get_executor());
db->set_extended_adapter(ext_adapter);
db->set_msg_callback(on_msg);
net::co_spawn(ioc, connection_manager(db), net::detached);
db->send(command::ping, "O rato roeu a roupa do rei de Roma");
db->send(command::incr, "redis-client-counter");
db->send(command::quit);
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
exit(EXIT_FAILURE);
}
}