From 0381ef605bf8be4e458d5fa17618fb6f67ab4113 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Mon, 17 Jan 2022 12:50:00 +0100 Subject: [PATCH] More progresses with the examples. --- examples/chat_room.cpp | 79 +++++++---------- examples/echo_server.cpp | 106 ++++++++++++---------- examples/lib/client_base.hpp | 136 +++++++++++++++-------------- include/aedis/aedis.hpp | 8 +- include/aedis/resp3/read.hpp | 8 +- include/aedis/resp3/serializer.hpp | 48 +++------- 6 files changed, 176 insertions(+), 209 deletions(-) diff --git a/examples/chat_room.cpp b/examples/chat_room.cpp index f4210461..6e084968 100644 --- a/examples/chat_room.cpp +++ b/examples/chat_room.cpp @@ -16,97 +16,84 @@ #include "lib/user_session.hpp" #include "src.hpp" +namespace net = aedis::net; using aedis::resp3::client_base; using aedis::command; using aedis::user_session; using aedis::user_session_base; +using aedis::resp3::node; +using aedis::resp3::detail::response_traits; +using aedis::resp3::type; -namespace net = aedis::net; -using aedis::resp3::client_base; +// TODO: Use all necessary data types. +struct adapter_helper { + using adapter_type = response_traits>::adapter_type; + adapter_type adapter; -// Holds the information that is needed when a response to a -// request arrives. See client_base.hpp for more details on the -// required fields in this struct. -struct response_id { - // The redis command that corresponds to this command. - command cmd = command::unknown; - - // Pointer to the response. - // TODO: Not needed here. Fix client_base and remove this. - std::shared_ptr resp; + void + operator()( + command, + type t, + std::size_t aggregate_size, + std::size_t depth, + char const* data, + std::size_t size, + std::error_code& ec) + { + adapter(t, aggregate_size, depth, data, size, ec); + } }; -using client_base_type = client_base; - -class my_redis_client : public client_base_type { +class my_redis_client : public client_base { private: + std::vector resp_; std::vector> sessions_; - void on_message(response_id id) override + void on_message(command) override { - id.resp->clear(); + resp_.clear(); } void on_push() override { for (auto& weak: sessions_) { if (auto session = weak.lock()) { - session->deliver(push_resp_.at(3).data); + session->deliver(resp_.at(3).data); } else { std::cout << "Session expired." << std::endl; } } - push_resp_.clear(); + resp_.clear(); } public: my_redis_client(net::any_io_executor ex) - : client_base_type(ex) + : client_base(ex, adapter_helper{adapt(resp_)}) {} auto subscribe(std::shared_ptr session) { sessions_.push_back(session); } }; -struct on_user_msg { - std::shared_ptr resp; - std::shared_ptr client; - - void operator()(std::string const& msg) - { - auto filler = [this, &msg](auto& req) - { - response_id id{command::publish, resp}; - req.push(id, "channel", msg); - }; - - client->send(filler); - } -}; - net::awaitable listener() { auto ex = co_await net::this_coro::executor; net::ip::tcp::acceptor acceptor(ex, {net::ip::tcp::v4(), 55555}); - // The response is shared by all connections. - auto resp = std::make_shared(); - - // The redis client instance. auto client = std::make_shared(ex); client->start(); - - auto filler = [resp](auto& req) - { req.push(response_id{command::subscribe, resp}, "channel"); }; - - client->send(filler); + client->send(command::subscribe, "channel"); for (;;) { auto socket = co_await acceptor.async_accept(net::use_awaitable); auto session = std::make_shared(std::move(socket)); client->subscribe(session); - session->start(on_user_msg{resp, client}); + + auto on_msg = [client](std::string const& msg) + { client->send(command::publish, "channel", msg); }; + + session->start(on_msg); } } diff --git a/examples/echo_server.cpp b/examples/echo_server.cpp index 2facb652..4dab37bf 100644 --- a/examples/echo_server.cpp +++ b/examples/echo_server.cpp @@ -14,64 +14,73 @@ #include "lib/user_session.hpp" #include "src.hpp" +namespace net = aedis::net; using aedis::resp3::client_base; using aedis::command; using aedis::user_session; using aedis::user_session_base; +using aedis::resp3::adapt; +using aedis::resp3::detail::response_traits; +using aedis::resp3::node; +using aedis::resp3::type; -namespace net = aedis::net; -using aedis::resp3::client_base; +// TODO: Use all necessary data types. +struct adapter_helper { + using adapter_type = response_traits>::adapter_type; + adapter_type adapter; -// Holds the information that is needed when a response to a -// request arrives. See client_base.hpp for more details on the -// required fields in this struct. -struct response_id { - // The redis command that corresponds to this command. - command cmd = command::unknown; - - // Pointer to the response. - std::shared_ptr resp; - - // The pointer to the session the request belongs to. - std::weak_ptr session = - std::shared_ptr{nullptr}; + void + operator()( + command, + type t, + std::size_t aggregate_size, + std::size_t depth, + char const* data, + std::size_t size, + std::error_code& ec) + { + adapter(t, aggregate_size, depth, data, size, ec); + } }; -using client_base_type = client_base; - -class my_redis_client : public client_base_type { +class my_redis_client : public client_base { private: - void on_message(response_id id) override + std::vector resp_; + std::queue> sessions_; + + void on_message(command cmd) override { - // If the user connections is still alive when the response - // arrives we send the echo message to the user, otherwise we - // just log it has expired. - if (auto session = id.session.lock()) { - session->deliver(*id.resp); - id.resp->clear(); - } else { - std::cout << "Session expired." << std::endl; + switch (cmd) { + case command::ping: + { + if (auto session = sessions_.front().lock()) { + session->deliver(resp_.front().data); + } else { + std::cout << "Session expired." << std::endl; + } + + sessions_.pop(); + resp_.clear(); + } break; + case command::incr: + { + std::cout << "Echos so far: " << resp_.front().data << std::endl; + resp_.clear(); + } break; + default: + { + assert(false); + } } } public: my_redis_client(net::any_io_executor ex) - : client_base_type(ex) + : client_base(ex, adapter_helper{adapt(resp_)}) {} -}; -struct on_user_msg { - std::shared_ptr resp; - std::shared_ptr client; - std::shared_ptr session; - - void operator()(std::string const& msg) const - { - auto filler = [this, &msg](auto& req) - { req.push(response_id{command::ping, resp, session}, msg); }; - - client->send(filler); - } + void add_user_session(std::shared_ptr session) + { sessions_.push(session); } }; net::awaitable listener() @@ -79,18 +88,21 @@ net::awaitable listener() auto ex = co_await net::this_coro::executor; net::ip::tcp::acceptor acceptor(ex, {net::ip::tcp::v4(), 55555}); - // The redis client instance. auto client = std::make_shared(ex); client->start(); - // The response is shared by all connections. - auto resp = std::make_shared(); - - // Loops accepting connections. for (;;) { auto socket = co_await acceptor.async_accept(net::use_awaitable); auto session = std::make_shared(std::move(socket)); - session->start(on_user_msg{resp, client, session}); + + auto on_msg = [client, session](std::string const& msg) + { + client->send(command::ping, msg); + client->add_user_session(session); + client->send(command::incr, "echo-counter"); + }; + + session->start(on_msg); } } diff --git a/examples/lib/client_base.hpp b/examples/lib/client_base.hpp index da46842b..f6ec6f7c 100644 --- a/examples/lib/client_base.hpp +++ b/examples/lib/client_base.hpp @@ -8,6 +8,7 @@ #pragma once #include +#include #include #include @@ -27,96 +28,107 @@ namespace resp3 { * * The ReponseId type is required to provide the cmd member. */ -template -class client_base - : public std::enable_shared_from_this> { +class client_base : public std::enable_shared_from_this { +public: + using adapter_type = std::function; + protected: - std::vector push_resp_; // push types. + // TODO: Remove this. std::vector hello_; // Hello. private: using tcp_socket = net::use_awaitable_t<>::as_default_on_t; - struct helper { + + struct request_info { + // Request size in bytes. std::size_t size = 0; + + // The number of commands it contains excluding commands that + // have push types as responses, see has_push_response. std::size_t cmds = 0; }; - // We are in the middle of a refactoring and there is some mess. + // Requests payload. std::string requests_; - std::queue> srs_; - std::queue req_info_; + + // The commands contained in the requests. + std::queue commands_; + + // Info about the requests. + std::queue req_info_; + + // The stream. tcp_socket socket_; // Timer used to inform the write coroutine that it can write the // next message in the output queue. net::steady_timer timer_; + // Adapter + adapter_type adapter_; + // A coroutine that keeps reading the socket. When a message // arrives it calls on_message. net::awaitable reader() { // Writes and reads continuosly from the socket. for (std::string buffer;;) { - // Writes the next request in the socket. - while (!std::empty(srs_)) { + while (!std::empty(req_info_)) { co_await net::async_write(socket_, net::buffer(requests_.data(), req_info_.front().size)); - requests_.erase(0, req_info_.front().size); - req_info_.pop(); - if (!std::empty(srs_.front().commands)) + requests_.erase(0, req_info_.front().size); + + if (req_info_.front().cmds != 0) break; // We must await the responses. - // Pops the request if no response is expected. - srs_.pop(); + req_info_.pop(); } - // Keeps reading while there are no messages queued waiting to be sent. - do { - // Loops to consume the response to all commands in the request. - do { - auto const t = - co_await async_read_type(socket_, net::dynamic_buffer(buffer)); - + do { // Keeps reading while there are no messages queued waiting to be sent. + do { // Consumes the responses to all commands in the request. + auto const t = co_await async_read_type(socket_, net::dynamic_buffer(buffer)); if (t == type::push) { - co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapt(push_resp_)); + auto adapter = [this](type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec){adapter_(command::unknown, t, aggregate_size, depth, data, size, ec);}; + co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapter); on_push(); } else { - auto adapter = adapt(*srs_.front().commands.front().resp); + auto adapter = [this](type t, std::size_t aggregate_size, std::size_t depth, char const* data, std::size_t size, std::error_code& ec){adapter_(commands_.front(), t, aggregate_size, depth, data, size, ec);}; co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapter); - on_message(srs_.front().commands.front()); - srs_.front().commands.pop(); + on_message(commands_.front()); + commands_.pop(); + --req_info_.front().cmds; } - } while (!std::empty(srs_) && !std::empty(srs_.front().commands)); + } while (!std::empty(req_info_) && req_info_.front().cmds != 0); - // We may exit the loop above either because we are done - // with the response or because we received a server push - // while the queue was empty. - if (!std::empty(srs_)) - srs_.pop(); + // We may exit the loop above either because we are done + // with the response or because we received a server push + // while the queue was empty so we have to check before + // poping.. + if (!std::empty(req_info_)) + req_info_.pop(); - } while (std::empty(srs_)); + } while (std::empty(req_info_)); } } // Write coroutine. It is kept suspended until there are messages - // that can be sent. + // to be sent. net::awaitable writer() { while (socket_.is_open()) { boost::system::error_code ec; co_await timer_.async_wait(net::redirect_error(net::use_awaitable, ec)); do { + assert(!std::empty(req_info_)); co_await net::async_write(socket_, net::buffer(requests_.data(), req_info_.front().size)); requests_.erase(0, req_info_.front().size); + + if (req_info_.front().cmds != 0) + break; + req_info_.pop(); - - if (!std::empty(srs_.front().commands)) - break; // We must await the responses. - - // Pops the request if no response is expected. - srs_.pop(); - } while (!std::empty(srs_)); + } while (!std::empty(req_info_)); } } @@ -165,14 +177,12 @@ private: */ bool prepare_next() { - if (std::empty(srs_)) { - srs_.push({requests_}); + if (std::empty(req_info_)) { req_info_.push({}); return true; } - if (std::size(srs_) == 1) { - srs_.push({requests_}); + if (std::size(req_info_) == 1) { req_info_.push({}); return false; } @@ -181,9 +191,11 @@ private: } public: - client_base(net::any_io_executor ex) + // Constructor + client_base(net::any_io_executor ex, adapter_type adapter = [](command, type, std::size_t, std::size_t, char const*, std::size_t, std::error_code&) {}) : socket_{ex} , timer_{ex} + , adapter_{adapter} { } virtual ~client_base() { } @@ -200,32 +212,22 @@ public: net::detached); } - /* Adds commands to the request queue and sends if possible. - * - * The filler callable get a request by reference, for example - * - * @code - * void f(serializer& req) - * { - * req.push(command::ping); - * ... - * } - * @endcode - * - * It will be called with the request that is at the back of the queue of - * outgoing requests. - */ - template - void send(Filler filler) + template + void send(command cmd, Ts const&... args) { - // Prepares the back of the queue for a new request. auto const can_write = prepare_next(); + auto sr = make_serializer(requests_); auto const before = std::size(requests_); - filler(srs_.back()); + sr.push(cmd, args...); auto const after = std::size(requests_); req_info_.front().size += after - before;; + if (!has_push_response(cmd)) { + commands_.emplace(cmd); + ++req_info_.front().cmds; + } + if (can_write) timer_.cancel_one(); } @@ -234,9 +236,9 @@ public: * * Override this function to receive events in your derived class. */ - virtual void on_message(ResponseId) {}; + virtual void on_message(command) {}; - /* Called when server push is received. + /* Called when a server push is received. * * Override this function to receive push events in the derived class. */ diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index a5fbc59b..cb998dde 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -23,7 +23,6 @@ - \subpage enums - \subpage classes - - \subpage read_write_ops - \subpage functions - \subpage operators */ @@ -150,16 +149,11 @@ */ -/** \defgroup functions Free functions (other) +/** \defgroup functions Free functions * \brief All functions defined by this library. */ -/** \defgroup read_write_ops Free functions (read/write operations) - * \brief RESP3 read and write functions. - */ - - /** \defgroup operators Operators * \brief Operators defined in Aedis */ diff --git a/include/aedis/resp3/read.hpp b/include/aedis/resp3/read.hpp index 9b32b418..a1ea2c04 100644 --- a/include/aedis/resp3/read.hpp +++ b/include/aedis/resp3/read.hpp @@ -21,7 +21,7 @@ namespace aedis { namespace resp3 { /** \brief Read the response to a command sychronously. - * \ingroup read_write_ops + * \ingroup functions * * This function has to be called once for each command in the * request until the whole request has been read. @@ -89,7 +89,7 @@ read( } /** \brief Reads the reponse to a command. - * \ingroup read_write_ops + * \ingroup functions * * This function has to be called once for each command in the * request until the whole request has been read. @@ -120,7 +120,7 @@ read( } /** @brief Reads the response to a Redis command asynchronously. - * \ingroup read_write_ops + * \ingroup functions * * This function has to be called once for each command in the * request until the whole request has been read. @@ -161,7 +161,7 @@ auto async_read( } /** \brief Reads the RESP3 type of the next incomming. - * \ingroup read_write_ops + * \ingroup functions * * This function won't consume any data from the buffer. The * completion handler must have the following signature. diff --git a/include/aedis/resp3/serializer.hpp b/include/aedis/resp3/serializer.hpp index 3fa1b77c..86a470af 100644 --- a/include/aedis/resp3/serializer.hpp +++ b/include/aedis/resp3/serializer.hpp @@ -37,43 +37,23 @@ namespace resp3 { * sr.push(command::quit); * co_await async_write(socket, buffer(sr.request())); * @endcode - * - * This class also maintains an internal queue of already added - * commands to assist users processing the response to each - * individual command contained in the request see response_queue.cpp - * for simple usage and echo_server.cpp for adavanced usage. */ -template +template class serializer { private: Container* request_; -public: - /// The commands that have been queued in this request. - std::queue commands; - public: /// Constructor serializer(Container& container) : request_(&container) {} - /** \brief Clears the serializer. - * - * \remark Already acquired memory won't be released. The is useful - * to reusing memory insteam of allocating again each time. - */ - void clear() - { - request_->clear(); - commands = {}; - } - /** @brief Appends a new command to the end of the request. * * Non-string types will be converted to string by using * \c to_string, which must be made available by the user by ADL. */ template - void push(ResponseId qelem, Ts const&... args) + void push(Command qelem, Ts const&... args) { // TODO: Should we detect any std::pair in the type in the pack // to calculate the header size correctly? @@ -81,12 +61,9 @@ public: auto constexpr pack_size = sizeof...(Ts); detail::add_header(*request_, 1 + pack_size); - auto const cmd = detail::request_get_command::apply(qelem); + auto const cmd = detail::request_get_command::apply(qelem); detail::add_bulk(*request_, to_string(cmd)); (detail::add_bulk(*request_, args), ...); - - if (!has_push_response(cmd)) - commands.emplace(qelem); } /** @brief Appends a new command to the end of the request. @@ -105,7 +82,7 @@ public: \endcode */ template - void push_range(ResponseId qelem, Key const& key, ForwardIterator begin, ForwardIterator end) + void push_range(Command qelem, Key const& key, ForwardIterator begin, ForwardIterator end) { // Note: For some commands like hset it would helpful to users // to assert the value type is a pair. @@ -115,15 +92,12 @@ public: auto constexpr size = detail::value_type_size::size; auto const distance = std::distance(begin, end); detail::add_header(*request_, 2 + size * distance); - auto const cmd = detail::request_get_command::apply(qelem); + auto const cmd = detail::request_get_command::apply(qelem); detail::add_bulk(*request_, to_string(cmd)); detail::add_bulk(*request_, key); for (; begin != end; ++begin) detail::add_bulk(*request_, *begin); - - if (!has_push_response(cmd)) - commands.emplace(qelem); } /** @brief Appends a new command to the end of the request. @@ -140,7 +114,7 @@ public: \endcode */ template - void push_range(ResponseId qelem, ForwardIterator begin, ForwardIterator end) + void push_range(Command qelem, ForwardIterator begin, ForwardIterator end) { // Note: For some commands like hset it would be a good idea to assert // the value type is a pair. @@ -150,24 +124,22 @@ public: auto constexpr size = detail::value_type_size::size; auto const distance = std::distance(begin, end); detail::add_header(*request_, 1 + size * distance); - auto const cmd = detail::request_get_command::apply(qelem); + auto const cmd = detail::request_get_command::apply(qelem); detail::add_bulk(*request_, to_string(cmd)); for (; begin != end; ++begin) detail::add_bulk(*request_, *begin); - - if (!has_push_response(cmd)) - commands.emplace(qelem); } }; /** \brief Creates a serializer from a container. * \ingroup functions + * TODO: Add the string template parameters. */ -template +template auto make_serializer(std::string& container) { - return serializer(container); + return serializer(container); } } // resp3