2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

More progresses with the examples.

This commit is contained in:
Marcelo Zimbres
2022-01-17 12:50:00 +01:00
parent 3b7aa4dcd3
commit 0381ef605b
6 changed files with 176 additions and 209 deletions

View File

@@ -16,97 +16,84 @@
#include "lib/user_session.hpp"
#include "src.hpp"
namespace net = aedis::net;
using aedis::resp3::client_base;
using aedis::command;
using aedis::user_session;
using aedis::user_session_base;
using aedis::resp3::node;
using aedis::resp3::detail::response_traits;
using aedis::resp3::type;
namespace net = aedis::net;
using aedis::resp3::client_base;
// TODO: Use all necessary data types.
struct adapter_helper {
using adapter_type = response_traits<std::vector<node>>::adapter_type;
adapter_type adapter;
// Holds the information that is needed when a response to a
// request arrives. See client_base.hpp for more details on the
// required fields in this struct.
struct response_id {
// The redis command that corresponds to this command.
command cmd = command::unknown;
// Pointer to the response.
// TODO: Not needed here. Fix client_base and remove this.
std::shared_ptr<std::string> resp;
void
operator()(
command,
type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
std::size_t size,
std::error_code& ec)
{
adapter(t, aggregate_size, depth, data, size, ec);
}
};
using client_base_type = client_base<response_id>;
class my_redis_client : public client_base_type {
class my_redis_client : public client_base {
private:
std::vector<node> resp_;
std::vector<std::weak_ptr<user_session_base>> sessions_;
void on_message(response_id id) override
void on_message(command) override
{
id.resp->clear();
resp_.clear();
}
void on_push() override
{
for (auto& weak: sessions_) {
if (auto session = weak.lock()) {
session->deliver(push_resp_.at(3).data);
session->deliver(resp_.at(3).data);
} else {
std::cout << "Session expired." << std::endl;
}
}
push_resp_.clear();
resp_.clear();
}
public:
my_redis_client(net::any_io_executor ex)
: client_base_type(ex)
: client_base(ex, adapter_helper{adapt(resp_)})
{}
auto subscribe(std::shared_ptr<user_session_base> session)
{ sessions_.push_back(session); }
};
struct on_user_msg {
std::shared_ptr<std::string> resp;
std::shared_ptr<my_redis_client> client;
void operator()(std::string const& msg)
{
auto filler = [this, &msg](auto& req)
{
response_id id{command::publish, resp};
req.push(id, "channel", msg);
};
client->send(filler);
}
};
net::awaitable<void> listener()
{
auto ex = co_await net::this_coro::executor;
net::ip::tcp::acceptor acceptor(ex, {net::ip::tcp::v4(), 55555});
// The response is shared by all connections.
auto resp = std::make_shared<std::string>();
// The redis client instance.
auto client = std::make_shared<my_redis_client>(ex);
client->start();
auto filler = [resp](auto& req)
{ req.push(response_id{command::subscribe, resp}, "channel"); };
client->send(filler);
client->send(command::subscribe, "channel");
for (;;) {
auto socket = co_await acceptor.async_accept(net::use_awaitable);
auto session = std::make_shared<user_session>(std::move(socket));
client->subscribe(session);
session->start(on_user_msg{resp, client});
auto on_msg = [client](std::string const& msg)
{ client->send(command::publish, "channel", msg); };
session->start(on_msg);
}
}

View File

@@ -14,64 +14,73 @@
#include "lib/user_session.hpp"
#include "src.hpp"
namespace net = aedis::net;
using aedis::resp3::client_base;
using aedis::command;
using aedis::user_session;
using aedis::user_session_base;
using aedis::resp3::adapt;
using aedis::resp3::detail::response_traits;
using aedis::resp3::node;
using aedis::resp3::type;
namespace net = aedis::net;
using aedis::resp3::client_base;
// TODO: Use all necessary data types.
struct adapter_helper {
using adapter_type = response_traits<std::vector<node>>::adapter_type;
adapter_type adapter;
// Holds the information that is needed when a response to a
// request arrives. See client_base.hpp for more details on the
// required fields in this struct.
struct response_id {
// The redis command that corresponds to this command.
command cmd = command::unknown;
// Pointer to the response.
std::shared_ptr<std::string> resp;
// The pointer to the session the request belongs to.
std::weak_ptr<user_session_base> session =
std::shared_ptr<user_session_base>{nullptr};
void
operator()(
command,
type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
std::size_t size,
std::error_code& ec)
{
adapter(t, aggregate_size, depth, data, size, ec);
}
};
using client_base_type = client_base<response_id>;
class my_redis_client : public client_base_type {
class my_redis_client : public client_base {
private:
void on_message(response_id id) override
std::vector<node> resp_;
std::queue<std::weak_ptr<user_session_base>> sessions_;
void on_message(command cmd) override
{
// If the user connections is still alive when the response
// arrives we send the echo message to the user, otherwise we
// just log it has expired.
if (auto session = id.session.lock()) {
session->deliver(*id.resp);
id.resp->clear();
} else {
std::cout << "Session expired." << std::endl;
switch (cmd) {
case command::ping:
{
if (auto session = sessions_.front().lock()) {
session->deliver(resp_.front().data);
} else {
std::cout << "Session expired." << std::endl;
}
sessions_.pop();
resp_.clear();
} break;
case command::incr:
{
std::cout << "Echos so far: " << resp_.front().data << std::endl;
resp_.clear();
} break;
default:
{
assert(false);
}
}
}
public:
my_redis_client(net::any_io_executor ex)
: client_base_type(ex)
: client_base(ex, adapter_helper{adapt(resp_)})
{}
};
struct on_user_msg {
std::shared_ptr<std::string> resp;
std::shared_ptr<my_redis_client> client;
std::shared_ptr<user_session> session;
void operator()(std::string const& msg) const
{
auto filler = [this, &msg](auto& req)
{ req.push(response_id{command::ping, resp, session}, msg); };
client->send(filler);
}
void add_user_session(std::shared_ptr<user_session_base> session)
{ sessions_.push(session); }
};
net::awaitable<void> listener()
@@ -79,18 +88,21 @@ net::awaitable<void> listener()
auto ex = co_await net::this_coro::executor;
net::ip::tcp::acceptor acceptor(ex, {net::ip::tcp::v4(), 55555});
// The redis client instance.
auto client = std::make_shared<my_redis_client>(ex);
client->start();
// The response is shared by all connections.
auto resp = std::make_shared<std::string>();
// Loops accepting connections.
for (;;) {
auto socket = co_await acceptor.async_accept(net::use_awaitable);
auto session = std::make_shared<user_session>(std::move(socket));
session->start(on_user_msg{resp, client, session});
auto on_msg = [client, session](std::string const& msg)
{
client->send(command::ping, msg);
client->add_user_session(session);
client->send(command::incr, "echo-counter");
};
session->start(on_msg);
}
}

View File

@@ -8,6 +8,7 @@
#pragma once
#include <queue>
#include <functional>
#include <aedis/aedis.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
@@ -27,96 +28,107 @@ namespace resp3 {
*
* The ReponseId type is required to provide the cmd member.
*/
template <class ResponseId>
class client_base
: public std::enable_shared_from_this<client_base<ResponseId>> {
class client_base : public std::enable_shared_from_this<client_base> {
public:
using adapter_type = std::function<void(command, type, std::size_t, std::size_t, char const*, std::size_t, std::error_code&)>;
protected:
std::vector<node> push_resp_; // push types.
// TODO: Remove this.
std::vector<node> hello_; // Hello.
private:
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
struct helper {
struct request_info {
// Request size in bytes.
std::size_t size = 0;
// The number of commands it contains excluding commands that
// have push types as responses, see has_push_response.
std::size_t cmds = 0;
};
// We are in the middle of a refactoring and there is some mess.
// Requests payload.
std::string requests_;
std::queue<serializer<std::string, ResponseId>> srs_;
std::queue<helper> req_info_;
// The commands contained in the requests.
std::queue<command> commands_;
// Info about the requests.
std::queue<request_info> req_info_;
// The stream.
tcp_socket socket_;
// Timer used to inform the write coroutine that it can write the
// next message in the output queue.
net::steady_timer timer_;
// Adapter
adapter_type adapter_;
// A coroutine that keeps reading the socket. When a message
// arrives it calls on_message.
net::awaitable<void> reader()
{
// Writes and reads continuosly from the socket.
for (std::string buffer;;) {
// Writes the next request in the socket.
while (!std::empty(srs_)) {
while (!std::empty(req_info_)) {
co_await net::async_write(socket_, net::buffer(requests_.data(), req_info_.front().size));
requests_.erase(0, req_info_.front().size);
req_info_.pop();
if (!std::empty(srs_.front().commands))
requests_.erase(0, req_info_.front().size);
if (req_info_.front().cmds != 0)
break; // We must await the responses.
// Pops the request if no response is expected.
srs_.pop();
req_info_.pop();
}
// Keeps reading while there are no messages queued waiting to be sent.
do {
// Loops to consume the response to all commands in the request.
do {
auto const t =
co_await async_read_type(socket_, net::dynamic_buffer(buffer));
do { // Keeps reading while there are no messages queued waiting to be sent.
do { // Consumes the responses to all commands in the request.
auto const t = co_await async_read_type(socket_, net::dynamic_buffer(buffer));
if (t == type::push) {
co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapt(push_resp_));
auto adapter = [this](type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec){adapter_(command::unknown, t, aggregate_size, depth, data, size, ec);};
co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapter);
on_push();
} else {
auto adapter = adapt(*srs_.front().commands.front().resp);
auto adapter = [this](type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec){adapter_(commands_.front(), t, aggregate_size, depth, data, size, ec);};
co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapter);
on_message(srs_.front().commands.front());
srs_.front().commands.pop();
on_message(commands_.front());
commands_.pop();
--req_info_.front().cmds;
}
} while (!std::empty(srs_) && !std::empty(srs_.front().commands));
} while (!std::empty(req_info_) && req_info_.front().cmds != 0);
// We may exit the loop above either because we are done
// with the response or because we received a server push
// while the queue was empty.
if (!std::empty(srs_))
srs_.pop();
// We may exit the loop above either because we are done
// with the response or because we received a server push
// while the queue was empty so we have to check before
// poping..
if (!std::empty(req_info_))
req_info_.pop();
} while (std::empty(srs_));
} while (std::empty(req_info_));
}
}
// Write coroutine. It is kept suspended until there are messages
// that can be sent.
// to be sent.
net::awaitable<void> writer()
{
while (socket_.is_open()) {
boost::system::error_code ec;
co_await timer_.async_wait(net::redirect_error(net::use_awaitable, ec));
do {
assert(!std::empty(req_info_));
co_await net::async_write(socket_, net::buffer(requests_.data(), req_info_.front().size));
requests_.erase(0, req_info_.front().size);
if (req_info_.front().cmds != 0)
break;
req_info_.pop();
if (!std::empty(srs_.front().commands))
break; // We must await the responses.
// Pops the request if no response is expected.
srs_.pop();
} while (!std::empty(srs_));
} while (!std::empty(req_info_));
}
}
@@ -165,14 +177,12 @@ private:
*/
bool prepare_next()
{
if (std::empty(srs_)) {
srs_.push({requests_});
if (std::empty(req_info_)) {
req_info_.push({});
return true;
}
if (std::size(srs_) == 1) {
srs_.push({requests_});
if (std::size(req_info_) == 1) {
req_info_.push({});
return false;
}
@@ -181,9 +191,11 @@ private:
}
public:
client_base(net::any_io_executor ex)
// Constructor
client_base(net::any_io_executor ex, adapter_type adapter = [](command, type, std::size_t, std::size_t, char const*, std::size_t, std::error_code&) {})
: socket_{ex}
, timer_{ex}
, adapter_{adapter}
{ }
virtual ~client_base() { }
@@ -200,32 +212,22 @@ public:
net::detached);
}
/* Adds commands to the request queue and sends if possible.
*
* The filler callable get a request by reference, for example
*
* @code
* void f(serializer& req)
* {
* req.push(command::ping);
* ...
* }
* @endcode
*
* It will be called with the request that is at the back of the queue of
* outgoing requests.
*/
template <class Filler>
void send(Filler filler)
template <class... Ts>
void send(command cmd, Ts const&... args)
{
// Prepares the back of the queue for a new request.
auto const can_write = prepare_next();
auto sr = make_serializer<command>(requests_);
auto const before = std::size(requests_);
filler(srs_.back());
sr.push(cmd, args...);
auto const after = std::size(requests_);
req_info_.front().size += after - before;;
if (!has_push_response(cmd)) {
commands_.emplace(cmd);
++req_info_.front().cmds;
}
if (can_write)
timer_.cancel_one();
}
@@ -234,9 +236,9 @@ public:
*
* Override this function to receive events in your derived class.
*/
virtual void on_message(ResponseId) {};
virtual void on_message(command) {};
/* Called when server push is received.
/* Called when a server push is received.
*
* Override this function to receive push events in the derived class.
*/

View File

@@ -23,7 +23,6 @@
- \subpage enums
- \subpage classes
- \subpage read_write_ops
- \subpage functions
- \subpage operators
*/
@@ -150,16 +149,11 @@
*/
/** \defgroup functions Free functions (other)
/** \defgroup functions Free functions
* \brief All functions defined by this library.
*/
/** \defgroup read_write_ops Free functions (read/write operations)
* \brief RESP3 read and write functions.
*/
/** \defgroup operators Operators
* \brief Operators defined in Aedis
*/

View File

@@ -21,7 +21,7 @@ namespace aedis {
namespace resp3 {
/** \brief Read the response to a command sychronously.
* \ingroup read_write_ops
* \ingroup functions
*
* This function has to be called once for each command in the
* request until the whole request has been read.
@@ -89,7 +89,7 @@ read(
}
/** \brief Reads the reponse to a command.
* \ingroup read_write_ops
* \ingroup functions
*
* This function has to be called once for each command in the
* request until the whole request has been read.
@@ -120,7 +120,7 @@ read(
}
/** @brief Reads the response to a Redis command asynchronously.
* \ingroup read_write_ops
* \ingroup functions
*
* This function has to be called once for each command in the
* request until the whole request has been read.
@@ -161,7 +161,7 @@ auto async_read(
}
/** \brief Reads the RESP3 type of the next incomming.
* \ingroup read_write_ops
* \ingroup functions
*
* This function won't consume any data from the buffer. The
* completion handler must have the following signature.

View File

@@ -37,43 +37,23 @@ namespace resp3 {
* sr.push(command::quit);
* co_await async_write(socket, buffer(sr.request()));
* @endcode
*
* This class also maintains an internal queue of already added
* commands to assist users processing the response to each
* individual command contained in the request see response_queue.cpp
* for simple usage and echo_server.cpp for adavanced usage.
*/
template <class Container, class ResponseId>
template <class Container, class Command>
class serializer {
private:
Container* request_;
public:
/// The commands that have been queued in this request.
std::queue<ResponseId> commands;
public:
/// Constructor
serializer(Container& container) : request_(&container) {}
/** \brief Clears the serializer.
*
* \remark Already acquired memory won't be released. The is useful
* to reusing memory insteam of allocating again each time.
*/
void clear()
{
request_->clear();
commands = {};
}
/** @brief Appends a new command to the end of the request.
*
* Non-string types will be converted to string by using
* \c to_string, which must be made available by the user by ADL.
*/
template <class... Ts>
void push(ResponseId qelem, Ts const&... args)
void push(Command qelem, Ts const&... args)
{
// TODO: Should we detect any std::pair in the type in the pack
// to calculate the header size correctly?
@@ -81,12 +61,9 @@ public:
auto constexpr pack_size = sizeof...(Ts);
detail::add_header(*request_, 1 + pack_size);
auto const cmd = detail::request_get_command<ResponseId>::apply(qelem);
auto const cmd = detail::request_get_command<Command>::apply(qelem);
detail::add_bulk(*request_, to_string(cmd));
(detail::add_bulk(*request_, args), ...);
if (!has_push_response(cmd))
commands.emplace(qelem);
}
/** @brief Appends a new command to the end of the request.
@@ -105,7 +82,7 @@ public:
\endcode
*/
template <class Key, class ForwardIterator>
void push_range(ResponseId qelem, Key const& key, ForwardIterator begin, ForwardIterator end)
void push_range(Command qelem, Key const& key, ForwardIterator begin, ForwardIterator end)
{
// Note: For some commands like hset it would helpful to users
// to assert the value type is a pair.
@@ -115,15 +92,12 @@ public:
auto constexpr size = detail::value_type_size<value_type>::size;
auto const distance = std::distance(begin, end);
detail::add_header(*request_, 2 + size * distance);
auto const cmd = detail::request_get_command<ResponseId>::apply(qelem);
auto const cmd = detail::request_get_command<Command>::apply(qelem);
detail::add_bulk(*request_, to_string(cmd));
detail::add_bulk(*request_, key);
for (; begin != end; ++begin)
detail::add_bulk(*request_, *begin);
if (!has_push_response(cmd))
commands.emplace(qelem);
}
/** @brief Appends a new command to the end of the request.
@@ -140,7 +114,7 @@ public:
\endcode
*/
template <class ForwardIterator>
void push_range(ResponseId qelem, ForwardIterator begin, ForwardIterator end)
void push_range(Command qelem, ForwardIterator begin, ForwardIterator end)
{
// Note: For some commands like hset it would be a good idea to assert
// the value type is a pair.
@@ -150,24 +124,22 @@ public:
auto constexpr size = detail::value_type_size<value_type>::size;
auto const distance = std::distance(begin, end);
detail::add_header(*request_, 1 + size * distance);
auto const cmd = detail::request_get_command<ResponseId>::apply(qelem);
auto const cmd = detail::request_get_command<Command>::apply(qelem);
detail::add_bulk(*request_, to_string(cmd));
for (; begin != end; ++begin)
detail::add_bulk(*request_, *begin);
if (!has_push_response(cmd))
commands.emplace(qelem);
}
};
/** \brief Creates a serializer from a container.
* \ingroup functions
* TODO: Add the string template parameters.
*/
template <class ResponseId>
template <class Command>
auto make_serializer(std::string& container)
{
return serializer<std::string, ResponseId>(container);
return serializer<std::string, Command>(container);
}
} // resp3