From 4189f7dc5660c54b5aca24d9ed50993c3009463e Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 21 Nov 2021 21:00:22 +0100 Subject: [PATCH] More improvements in the examples. --- Makefile.am | 15 ++- doc/Doxyfile | 2 +- examples/advanced.cpp | 146 --------------------- examples/basic1.cpp | 23 ++-- examples/basic2.cpp | 3 + examples/basic3.cpp | 4 +- examples/basic4.cpp | 38 +++--- examples/client_base.hpp | 181 ++++++++++++++++++++++++++ examples/containers.cpp | 79 +++++++---- examples/custom_response1.cpp | 10 +- examples/custom_response2.cpp | 16 ++- examples/myclient1.cpp | 77 +++++++++++ examples/utils.ipp | 4 +- include/aedis/aedis.hpp | 54 ++------ include/aedis/resp3/read.hpp | 75 ++--------- include/aedis/resp3/response.hpp | 7 +- include/aedis/resp3/response_base.hpp | 12 +- include/aedis/resp3/stream.hpp | 158 ---------------------- include/aedis/resp3/type.hpp | 1 + include/aedis/resp3/write.hpp | 43 +++--- tests/general.cpp | 36 ++--- 21 files changed, 455 insertions(+), 529 deletions(-) delete mode 100644 examples/advanced.cpp create mode 100644 examples/client_base.hpp create mode 100644 examples/myclient1.cpp delete mode 100644 include/aedis/resp3/stream.hpp diff --git a/Makefile.am b/Makefile.am index dd8f07ea..d5c544a6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -42,11 +42,6 @@ libaedis_a_SOURCES += $(top_srcdir)/src/aedis.cpp libaedis_a_CPPFLAGS = $(MY_CPPFLAGS) noinst_PROGRAMS = -noinst_PROGRAMS += advanced -advanced_SOURCES = $(top_srcdir)/examples/advanced.cpp -advanced_CPPFLAGS = $(MY_CPPFLAGS) -advanced_LDADD = $(MY_LDADD) - noinst_PROGRAMS += basic1 basic1_SOURCES = $(top_srcdir)/examples/basic1.cpp basic1_CPPFLAGS = $(MY_CPPFLAGS) @@ -82,6 +77,11 @@ custom_response2_SOURCES = $(top_srcdir)/examples/custom_response2.cpp custom_response2_CPPFLAGS = $(MY_CPPFLAGS) custom_response2_LDADD = $(MY_LDADD) +noinst_PROGRAMS += myclient1 +myclient1_SOURCES = $(top_srcdir)/examples/myclient1.cpp +myclient1_CPPFLAGS = $(MY_CPPFLAGS) +myclient1_LDADD = $(MY_LDADD) + noinst_PROGRAMS += test test_SOURCES = $(top_srcdir)/tests/general.cpp test_CPPFLAGS = $(MY_CPPFLAGS) @@ -96,6 +96,11 @@ EXTRA_DIST += $(top_srcdir)/README.md CLEANFILES = CLEANFILES += Makefile.dep +.PHONY: doc +doc: + rm -rf /tmp/aedis + doxygen doc/Doxyfile + .PHONY: deb deb: dist export CPPFLAGS="$(BOOST_CPPFLAGS) $(CPPFLAGS)" &&\ diff --git a/doc/Doxyfile b/doc/Doxyfile index d122fcd2..a4f9d98e 100644 --- a/doc/Doxyfile +++ b/doc/Doxyfile @@ -917,7 +917,7 @@ RECURSIVE = YES # Note that relative paths are relative to the directory from which doxygen is # run. -EXCLUDE = +EXCLUDE = include/aedis/impl include/aedis/resp3/impl include/aedis/resp3/detail # The EXCLUDE_SYMLINKS tag can be used to select whether or not files or # directories that are symbolic links (a Unix file system feature) are excluded diff --git a/examples/advanced.cpp b/examples/advanced.cpp deleted file mode 100644 index 4d867b3c..00000000 --- a/examples/advanced.cpp +++ /dev/null @@ -1,146 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include - -#include - -#include "types.hpp" -#include "utils.ipp" - -using aedis::command; -using aedis::resp3::request; -using aedis::resp3::type; -using aedis::resp3::response; -using aedis::resp3::async_read; -using aedis::resp3::async_read_type; - -namespace net = aedis::net; - - -/* A more elaborate way of doing what has been done above where we send a new - * command only after the last one has arrived. This is usually the starting - * point for more complex applications. Here we also separate the application - * logic out out the coroutine for clarity. - */ -bool prepare_next(std::queue& reqs) -{ - if (std::empty(reqs)) { - reqs.push({}); - return true; - } - - if (std::size(reqs) == 1) { - reqs.push({}); - return false; - } - - return false; -} - -net::awaitable -writer(tcp_socket& socket, std::queue& reqs, std::string message) -{ - auto ex = co_await aedis::net::this_coro::executor; - net::steady_timer t{ex}; - - while (socket.is_open()) { - t.expires_after(std::chrono::milliseconds{100}); - co_await t.async_wait(net::use_awaitable); - - auto const can_write = prepare_next(reqs); - reqs.back().push(command::publish, "channel", message); - reqs.back().push(command::publish, "channel", message); - reqs.back().push(command::publish, "channel", message); - if (can_write) - co_await async_write(socket, reqs.front()); - } -} - -net::awaitable reader(tcp_socket& socket, std::queue& reqs) -{ - // Writes and reads continuosly from the socket. - for (std::string buffer;;) { - // Writes the first request in queue and all subsequent - // ones that have no response e.g. subscribe. - do { - co_await async_write(socket, reqs.front()); - - // Pops the request if no response is expected. - if (std::empty(reqs.front().commands)) - reqs.pop(); - - } while (!std::empty(reqs) && std::empty(reqs.front().commands)); - - // Keeps reading while there is no messages queued waiting to be sent. - do { - // Loops to consume the response to all commands in the request. - do { - // Reads the type of the incoming response. - auto const t = co_await async_read_type(socket, buffer); - - if (t == type::push) { - response resp; - co_await async_read(socket, buffer, resp); - std::cout << resp << std::endl; - } else { - // Prints the command and the response to it. - switch (reqs.front().commands.front()) { - case command::hello: - { - response resp; - co_await async_read(socket, buffer, resp); - - for (auto i = 0; i < 100; ++i) { - std::string msg = "Writer "; - msg += std::to_string(i); - co_spawn(socket.get_executor(), writer(socket, reqs, msg), net::detached); - } - } break; - default: - { - response resp; - co_await async_read(socket, buffer, resp); - - std::cout - << reqs.front().commands.front() << ":\n" - << resp << std::endl; - } - } - // Done with this command, pop. - reqs.front().commands.pop(); - } - } while (!std::empty(reqs) && !std::empty(reqs.front().commands)); - - // We may exit the loop above either because we are done - // with the response or because we received a server push - // while the queue was empty. - if (!std::empty(reqs)) - reqs.pop(); - - } while (std::empty(reqs)); - } -} - -net::awaitable advanced() -{ - auto socket = co_await make_connection("127.0.0.1", "6379"); - - std::queue reqs; - reqs.push({}); - reqs.back().push(command::hello, 3); - reqs.back().push(command::subscribe, "channel"); - - co_await co_spawn(socket.get_executor(), reader(socket, reqs), net::use_awaitable); -} - -int main() -{ - net::io_context ioc; - co_spawn(ioc, advanced(), net::detached); - ioc.run(); -} diff --git a/examples/basic1.cpp b/examples/basic1.cpp index e0634118..797fa630 100644 --- a/examples/basic1.cpp +++ b/examples/basic1.cpp @@ -20,16 +20,16 @@ using aedis::resp3::async_read; namespace net = aedis::net; -/** A simple example that illustrates the basic principles. Three - * commands are sent in the same request - * - * 1. hello (always required) - * 2. ping - * 3. quit - * - * The responses are then read individually and for simplification in - * the same response object. - */ +/* A simple example that illustrates the basic principles. Three + commands are sent in the same request + + 1. hello (always required) + 2. ping + 3. quit + + The responses are then read individually and for simplification in + the same response object. +*/ net::awaitable ping() { try { @@ -60,9 +60,12 @@ net::awaitable ping() } } +/// The main function that starts the coroutine. int main() { net::io_context ioc; co_spawn(ioc, ping(), net::detached); ioc.run(); } + +/// \example basic1.cpp diff --git a/examples/basic2.cpp b/examples/basic2.cpp index 8639be42..330c5140 100644 --- a/examples/basic2.cpp +++ b/examples/basic2.cpp @@ -1,4 +1,5 @@ /* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/// \example basic1.cpp * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -57,3 +58,5 @@ int main() co_spawn(ioc, ping(), net::detached); ioc.run(); } + +/// \example basic2.cpp diff --git a/examples/basic3.cpp b/examples/basic3.cpp index 09a0f81d..551a50d4 100644 --- a/examples/basic3.cpp +++ b/examples/basic3.cpp @@ -33,7 +33,7 @@ void prepare_next(std::queue& reqs) reqs.push({}); } -/** The function that processes the response has been factored out of +/* The function that processes the response has been factored out of * the coroutine to simplify application logic. */ void process_response(std::queue& reqs, response& resp) @@ -85,3 +85,5 @@ int main() co_spawn(ioc, ping(), net::detached); ioc.run(); } + +/// \example basic3.cpp diff --git a/examples/basic4.cpp b/examples/basic4.cpp index 842a810d..855cc6ff 100644 --- a/examples/basic4.cpp +++ b/examples/basic4.cpp @@ -14,24 +14,24 @@ using namespace aedis; -/** In previous examples we sent the command we were interested in and - * quit (closed) the connection. In this example we send a - * subscription to a channel and start reading for message - * indefinitely. - * - * Notice we store the id of the connection as seem by redis to be - * able to identify it. - * - * After starting the example you can send messages with the - * redis-client like this - * - * $ redis-cli -3 - * 127.0.0.1:6379> PUBLISH channel1 mmmm - * (integer) 3 - * 127.0.0.1:6379> - * - * The messages will then appear on the terminal you are running the - * example. +/* In previous examples we sent the command we were interested in and + quit (closed) the connection. In this example we send a + subscription to a channel and start reading for message + indefinitely. + + Notice we store the id of the connection as seem by redis to be + able to identify it. + + After starting the example you can send messages with the + redis-client like this + + $ redis-cli -3 + 127.0.0.1:6379> PUBLISH channel1 mmmm + (integer) 3 + 127.0.0.1:6379> + + The messages will then appear on the terminal you are running the + example. */ net::awaitable subscriber() { @@ -73,3 +73,5 @@ int main() co_spawn(ioc, subscriber(), net::detached); ioc.run(); } + +/// \example basic4.cpp diff --git a/examples/client_base.hpp b/examples/client_base.hpp new file mode 100644 index 00000000..566912a9 --- /dev/null +++ b/examples/client_base.hpp @@ -0,0 +1,181 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +#include + +#include "types.hpp" + +namespace aedis { +namespace resp3 { + +/* An example redis client. + */ +class client_base : public std::enable_shared_from_this { +private: + tcp_socket socket_; + net::steady_timer timer_; + std::queue reqs_; + + // Prepares the back of the request queue for new requests. Returns true + // when the front of the queue can be sent to the redis server. + bool prepare_next() + { + if (std::empty(reqs_)) { + // We are not waiting any response. + reqs_.push({}); + return true; + } + + // A non-empty queue means we are waiting for a response. Since the + // reader will automatically send any outstanding requests when the + // response arrives, we have to return false hier. + + if (std::size(reqs_) == 1) { + // The user should not append any new commands to this request as it + // has been already sent to redis. + reqs_.push({}); + } + + return false; + } + + net::awaitable reader() + { + // Writes and reads continuosly from the socket. + for (std::string buffer;;) { + // Writes the first request in queue and all subsequent ones that have + // no response e.g. subscribe. + co_await async_write_some(socket_, reqs_); + + // Keeps reading while there is no messages queued waiting to be sent. + do { + // Loops to consume the response to all commands in the request. + do { + // Reads the type of the incoming response. + auto const t = co_await async_read_type(socket_, buffer); + + if (t == type::push) { + auto& resp = get_response(t); + co_await async_read(socket_, buffer, resp); + on_event(t); + } else { + auto& resp = get_response(t, reqs_.front().commands.front()); + co_await async_read(socket_, buffer, resp); + on_event(t, reqs_.front().commands.front()); + reqs_.front().commands.pop(); + } + + } while (!std::empty(reqs_) && !std::empty(reqs_.front().commands)); + + // We may exit the loop above either because we are done + // with the response or because we received a server push + // while the queue was empty. + if (!std::empty(reqs_)) + reqs_.pop(); + + } while (std::empty(reqs_)); + } + } + + net::awaitable writer() + { + for (;;) { + boost::system::error_code ec; + co_await timer_.async_wait(net::redirect_error(net::use_awaitable, ec)); + + if (!socket_.is_open()) + break; + + co_await async_write_some(socket_, reqs_); + } + } + + net::awaitable conn_manager() + { + tcp_resolver resolver{socket_.get_executor()}; + auto const res = co_await resolver.async_resolve("127.0.0.1", "6379"); + co_await aedis::net::async_connect(socket_, res); + + reqs_.push({}); + reqs_.back().push(command::hello, 3); + + co_spawn(socket_.get_executor(), + [self = shared_from_this()]{ return self->writer(); }, + net::detached); + + co_await co_spawn(socket_.get_executor(), + [self = shared_from_this()]{ return self->reader(); }, + net::use_awaitable); + + socket_.close(); + timer_.cancel_one(); + } + +public: + client_base(net::any_io_executor ex) + : socket_{ex} + , timer_{ex} + { + timer_.expires_at(std::chrono::steady_clock::time_point::max()); + } + + ~client_base() + { + socket_.close(); + timer_.cancel(); + } + + net::any_io_executor get_executor() + { + return socket_.get_executor(); + } + + void start() + { + net::co_spawn(socket_.get_executor(), + [self = shared_from_this()]{ return self->conn_manager(); }, + net::detached); + } + + /* Adds commands the requests queue and sends if possible. + */ + template + void send(Filler filler) + { + // Prepares the back of the queue for a new command. + auto const can_write = prepare_next(); + + filler(reqs_.back()); + + if (can_write) + timer_.cancel_one(); + } + + /* @brief Returns the response object the the used wishes to use + */ + virtual response_base& + get_response(type t, command cmd = command::unknown) = 0; + + /* Function called when data has been aready. + */ + virtual void + on_event(type t, command cmd = command::unknown) = 0; + + template + std::shared_ptr shared_from_base() + { + return std::static_pointer_cast(shared_from_this()); + } +}; + +} // resp3 +} // aedis + diff --git a/examples/containers.cpp b/examples/containers.cpp index ddf14fde..681f3793 100644 --- a/examples/containers.cpp +++ b/examples/containers.cpp @@ -16,42 +16,66 @@ #include "types.hpp" #include "utils.ipp" -using namespace aedis; +using aedis::command; +using aedis::resp3::request; +using aedis::resp3::response; +using aedis::resp3::async_read; +using aedis::resp3::async_write; + +namespace net = aedis::net; net::awaitable stl_containers() { - std::vector vec - {1, 2, 3, 4, 5, 6}; + try { + auto socket = co_await make_connection("127.0.0.1", "6379"); - std::set set - {"one", "two", "three"}; + request req; - std::map map - { {"key1", "value1"} - , {"key2", "value2"} - , {"key3", "value3"} - }; + // hello with version 3 is always required. + req.push(command::hello, 3); - resp3::request req; - req.push(command::hello, 3); - req.push(command::flushall); - req.push_range(command::rpush, "vector", std::cbegin(vec), std::cend(vec)); - req.push_range(command::sadd, "set", std::cbegin(set), std::cend(set)); - req.push_range(command::hset, "map", std::cbegin(map), std::cend(map)); + // Sends a flushall to avoid hitting an existing that happen to contain a + // different data type. + req.push(command::flushall); - auto socket = co_await make_connection("127.0.0.1", "6379"); - co_await async_write(socket, req); + // rpush with a vector. + std::vector vec + {1, 2, 3, 4, 5, 6}; - std::string buffer; - while (!std::empty(req.commands)) { - resp3::response resp; - co_await async_read(socket, buffer, resp); + req.push_range(command::rpush, "key1", std::cbegin(vec), std::cend(vec)); - std::cout - << req.commands.front() << ":\n" - << resp << std::endl; + // sadd with a set. + std::set set + {"one", "two", "three"}; - req.commands.pop(); + req.push_range(command::sadd, "key2", std::cbegin(set), std::cend(set)); + std::cout << "cc" << std::endl; + + // hset with a map. + std::map map + { {"key1", "value1"} + , {"key2", "value2"} + , {"key3", "value3"} + }; + + req.push_range(command::hset, "key3", std::cbegin(map), std::cend(map)); + + // Communication with the redis server starts here. + co_await async_write(socket, req); + + std::string buffer; + while (!std::empty(req.commands)) { + response resp; + co_await async_read(socket, buffer, resp); + + std::cout + << req.commands.front() << ":\n" + << resp << std::endl; + + req.commands.pop(); + } + } catch (std::exception const& e) { + std::cerr << e.what() << std::endl; } } @@ -61,3 +85,6 @@ int main() co_spawn(ioc, stl_containers(), net::detached); ioc.run(); } + + +/// \example containers.cpp diff --git a/examples/custom_response1.cpp b/examples/custom_response1.cpp index 86611f60..47fce2ab 100644 --- a/examples/custom_response1.cpp +++ b/examples/custom_response1.cpp @@ -23,12 +23,12 @@ using aedis::resp3::response_base; namespace net = aedis::net; -/** Illustrates how to write a custom response. Useful to users +/* Illustrates how to write a custom response. Useful to users * seeking to improve performance and reduce latency. */ -/** A response that parses the result of a response directly in an int +/* A response that parses the result of a response directly in an int * variable. The same reasoning can be applied for keys containing * e.g. json strings. * @@ -49,10 +49,10 @@ struct response_int : response_base { } }; -/// To ignore the reponse of a command use the response base class. +// To ignore the reponse of a command use the response base class. using response_ignore = response_base; -/** This coroutine avoids reading the response to a get command in a +/* This coroutine avoids reading the response to a get command in a * temporary buffer by using a custom response. This is always * possible when the application knows the data type being stored in * a specific key. @@ -99,3 +99,5 @@ int main() co_spawn(ioc, example(), net::detached); ioc.run(); } + +/// \example custom_response1.cpp diff --git a/examples/custom_response2.cpp b/examples/custom_response2.cpp index b56f40f0..828b43f3 100644 --- a/examples/custom_response2.cpp +++ b/examples/custom_response2.cpp @@ -23,14 +23,16 @@ using aedis::resp3::response_base; namespace net = aedis::net; -/** Similar to custon_response1 but handles aggregagate data types. - * - * Instead of reading a response into a std::vector and - * then converting to a std::vector we parse the ints form the - * read buffer directly into an int. +/* \brief An example response class for aggregate data types. + + Similar to custon_response1 but handles aggregagate data types. + + Instead of reading a response into a std::vector and + then converting to a std::vector we parse the ints form the + read buffer directly into an int. */ -/** A response type that parses the response directly in a +/* A response type that parses the response directly in a * std::vector */ class response_vector : public response_base { @@ -106,3 +108,5 @@ int main() co_spawn(ioc, ping(), net::detached); ioc.run(); } + +/// \example custom_response2.cpp diff --git a/examples/myclient1.cpp b/examples/myclient1.cpp new file mode 100644 index 00000000..d9fe41b7 --- /dev/null +++ b/examples/myclient1.cpp @@ -0,0 +1,77 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include + +#include + +#include "types.hpp" +#include "utils.ipp" +#include "client_base.hpp" + +using aedis::command; +using aedis::resp3::request; +using aedis::resp3::type; +using aedis::resp3::response; +using aedis::resp3::response_base; +using aedis::resp3::client_base; + +namespace net = aedis::net; + +class myclient : public client_base { +private: + response resp_; + + response_base& + get_response(type t, command cmd) override + { + return resp_; + } + + void + on_event(type t, command cmd) override + { + std::cout << cmd << ":\n" << resp_ << std::endl; + resp_.clear(); + } + +public: + myclient(net::any_io_executor ex) + : client_base(ex) { } +}; + +// A coroutine that will send command to redis every second. +template +net::awaitable +event_simulator(std::shared_ptr rclient, Filler filler) +{ + auto ex = co_await net::this_coro::executor; + net::steady_timer t{ex}; + + for (;;) { + t.expires_after(std::chrono::seconds{1}); + co_await t.async_wait(net::use_awaitable); + rclient->send(filler); + } +} + +int main() +{ + net::io_context ioc; + auto rclient = std::make_shared(ioc.get_executor()); + rclient->start(); + + auto filler = [](auto& req) + { req.push(command::ping); }; + + for (auto i = 0; i < 1; ++i) + co_spawn(ioc.get_executor(), event_simulator(rclient, filler), net::detached); + + ioc.run(); +} + +/// \example myclient1.cpp diff --git a/examples/utils.ipp b/examples/utils.ipp index cc811c4b..8bd74a9b 100644 --- a/examples/utils.ipp +++ b/examples/utils.ipp @@ -13,8 +13,8 @@ aedis::net::awaitable make_connection( - std::string const& host, - std::string const& port) + std::string host, + std::string port) { auto ex = co_await aedis::net::this_coro::executor; tcp_resolver resolver{ex}; diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index f9ea5fa0..ba7bc41e 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -11,55 +11,25 @@ #include #include #include -#include #include -/** \mainpage My Personal Index Page +/** \mainpage Aedis \section intro_sec Introduction - Aedis is an async redis client built on top of Boost.Asio. It was written with emphasis - on simplicity and avoid imposing any performance penalty on its users. + Aedis is an async redis client built on top of Boost.Asio. It was written + with emphasis on simplicity and avoid imposing any performance penalty on + its users. - The recommended useage looks like the code below. - - \code{.cpp} - - std::queue requests; - requests.push({}); - requests.back().push(command::hello, 3); - ... - - resp3::stream stream{std::move(socket)}; - for (;;) { - resp3::response resp; - co_await stream.async_consume(requests, resp); - - if (resp.get_type() == resp3::type::push) { - continue; // Push type received. Do something and continue. - } - - switch (requests.front().commands.front()) { - case command::hello: - { - prepare_next(requests); - requests.back().push(command::publish, "channel1", "Message to channel1"); - requests.back().push(command::publish, "channel2", "Message to channel2"); - } break; - // ... - } - } - - \endcode - - The function async_consume will write any out outstanding command in the queue if needed. That - makes it possible for users to add new commands without havin to write them explicitly. For example - \section install_sec Installation - - \subsection step1 Step 1: Opening the box - - etc... + + Aedis is a header only library. You only need to include the header + + @code + #include + @endcode + + in one of your source files. */ /** \file aedis.hpp diff --git a/include/aedis/resp3/read.hpp b/include/aedis/resp3/read.hpp index 127dfccc..1ec5ff3d 100644 --- a/include/aedis/resp3/read.hpp +++ b/include/aedis/resp3/read.hpp @@ -21,6 +21,11 @@ namespace aedis { namespace resp3 { +/** \file read.hpp + * + * Read utility functions. + */ + template auto read( SyncReadStream& stream, @@ -71,9 +76,10 @@ read( return n; } -/** @brief Reads one command from the redis response +/** @brief Reads the next command from a redis response * - * Note: This function has to be called once for each command. + * This function has to be called once for each command until the whole + * response has been cosumed. */ template < class AsyncReadStream, @@ -160,6 +166,8 @@ public: } }; +/** \brief Asynchronously reads the type of the next incomming request. + */ template < class AsyncReadStream, class Storage, @@ -178,69 +186,6 @@ auto async_read_type( >(type_op {stream, &buffer}, token, stream); } -template < - class AsyncReadWriteStream, - class Response -> -struct consumer_op { - AsyncReadWriteStream& stream; - std::string& buffer; - std::queue& requests; - Response& resp; - type& m_type; - net::coroutine& coro; - - template - void operator()( - Self& self, - boost::system::error_code const& ec = {}, - type t = type::invalid) - { - reenter (coro) for (;;) - { - // Writes the next request in the queue and possibly some - // more if they contain only push types as response. - yield async_write_some(stream, requests, std::move(self)); - if (ec) { - self.complete(ec, type::invalid); - return; - } - - // Loops on a read while there is nothing to write. - do { - // Loops until a response to each of the commands in the - // pipeline has been received. - do { - yield async_read_type(stream, buffer, std::move(self)); - if (ec) { - self.complete(ec, type::invalid); - return; - } - - m_type = t; - - yield async_read(stream, buffer, resp, std::move(self)); - - if (ec) { - self.complete(ec, type::invalid); - return; - } - - yield self.complete(ec, m_type); - - if (m_type != type::push) - requests.front().commands.pop(); - - } while (!std::empty(requests) && !std::empty(requests.front().commands)); - - if (!std::empty(requests)) - requests.pop(); - - } while (std::empty(requests)); - } - } -}; - } // resp3 } // aedis diff --git a/include/aedis/resp3/response.hpp b/include/aedis/resp3/response.hpp index 5acd5980..990e67db 100644 --- a/include/aedis/resp3/response.hpp +++ b/include/aedis/resp3/response.hpp @@ -15,11 +15,14 @@ namespace aedis { namespace resp3 { -/** A pre-order-view of the response tree. +/** \brief A general pupose redis response class + + A pre-order-view of the response tree. + */ class response : public response_base { public: - /** Represents a node in the response tree. + /** \brief A node in the response tree. */ struct node { enum class dump_format {raw, clean}; diff --git a/include/aedis/resp3/response_base.hpp b/include/aedis/resp3/response_base.hpp index 623def70..ead23a42 100644 --- a/include/aedis/resp3/response_base.hpp +++ b/include/aedis/resp3/response_base.hpp @@ -14,14 +14,20 @@ namespace aedis { namespace resp3 { -/** @brief Response adapter base class. +/** @brief A base class for all response types. * * Users are allowed to override this class to customize responses. */ struct response_base { - virtual void add(type t, std::size_t n, std::size_t depth, char const* data = nullptr, std::size_t size = 0) {} + /** @brief Function called by the parser when new data has been processed. + * + * Users who what to customize their response types are required to derive + * from this class and override this function, see examples. + */ + virtual + void add(type t, std::size_t n, std::size_t depth, char const* data = nullptr, std::size_t size = 0) {} - /** Virtual destructor to allow inheritance. + /** @brief Virtual destructor to allow inheritance. */ virtual ~response_base() {} }; diff --git a/include/aedis/resp3/stream.hpp b/include/aedis/resp3/stream.hpp deleted file mode 100644 index a5a1ef5f..00000000 --- a/include/aedis/resp3/stream.hpp +++ /dev/null @@ -1,158 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#pragma once - -#include - -#include -#include -#include -#include - -namespace aedis { -namespace resp3 { - -/** Reads and writes redis commands. - */ -template -class stream { -public: - /// The type of the next layer. - using next_layer_type = typename std::remove_reference::type; - - /// The type of the executor associated with the object. - using executor_type = typename next_layer_type::executor_type; - -private: - using buffer_type = std::string; - buffer_type buffer_; - net::coroutine coro_ = net::coroutine(); - type type_ = type::invalid; - next_layer_type next_layer_; - -public: - template - explicit stream(Arg&& arg) - : next_layer_(std::forward(arg)) - { } - - stream(stream&& other) = default; - stream& operator=(stream&& other) = delete; - - /// Get the executor associated with the object. - /** - * This function may be used to obtain the executor object that the stream - * uses to dispatch handlers for asynchronous operations. - * - * @return A copy of the executor that stream will use to dispatch handlers. - */ - executor_type get_executor() noexcept - { return next_layer_.get_executor(); } - - /// Get a reference to the next layer. - /** - * This function returns a reference to the next layer in a stack of - * stream layers. - * - * @return A reference to the next layer in the stack of stream - * layers. Ownership is not transferred to the caller. - */ - next_layer_type const& next_layer() const - { return next_layer_; } - - /// Get a reference to the next layer. - /** - * This function returns a reference to the next layer in a stack - * of stream layers. - * - * @return A reference to the next layer in the stack of stream - * layers. Ownership is not transferred to the caller. - */ - next_layer_type& next_layer() - { return next_layer_; } - - /// Writes and reads requests. - /** Performs the following operations - * - * 1. Write one or more requests in the queue (see async_write_some) - * 2. Reads the responses for each command in the request - * individually, returning control to the users. - * - * When there is no more requests to be written it will wait on a - * read. - */ - template> - auto async_consume( - std::queue& requests, - response& resp, - CompletionToken&& token = net::default_completion_token_t{}) - { - return net::async_compose< - CompletionToken, void(boost::system::error_code, type)>( - consumer_op {next_layer_, buffer_, requests, resp, type_, coro_}, - token, next_layer_); - } - - /** @brief Writes one or more requests to the stream. - * - * Sends the last request in the input queue to the server. If the - * next request happens to contain commands the have a push type as - * a response (see subscribe) they will also be sent. - */ - template> - auto async_write_some( - std::queue& requests, - CompletionToken&& token = net::default_completion_token_t{}) - { - return net::async_compose< - CompletionToken, - void(boost::system::error_code)>( - write_some_op{next_layer_, requests}, - token, next_layer_); - } - - template > - auto async_read( - response& resp, - CompletionToken&& token = net::default_completion_token_t{}) - { - return net::async_compose - < CompletionToken - , void(boost::system::error_code) - >(detail::parse_op {next_layer_, &buffer_, &resp}, - token, next_layer_); - } - - /** @brief Sends the request to the server. - */ - template > - auto async_write( - request& req, - CompletionToken&& token = net::default_completion_token_t{}) - { - return net::async_write(next_layer_, net::buffer(req.payload), token); - } - - /** @brief Reads the type of the next request. - */ - template < - class CompletionToken = net::default_completion_token_t - > - auto async_read_type( - CompletionToken&& token = net::default_completion_token_t{}) - { - return net::async_compose - < CompletionToken - , void(boost::system::error_code, type) - >(type_op {next_layer_, &buffer_}, token, next_layer_); - } - -}; - -} // resp3 -} // aedis diff --git a/include/aedis/resp3/type.hpp b/include/aedis/resp3/type.hpp index 2e8dbe90..0681dee4 100644 --- a/include/aedis/resp3/type.hpp +++ b/include/aedis/resp3/type.hpp @@ -15,6 +15,7 @@ namespace aedis { namespace resp3 { /** \file type.hpp + \brief Enum that describes the redis data types and some helper functions. This file contains the enum used to identify the redis data type and some helper functions. diff --git a/include/aedis/resp3/write.hpp b/include/aedis/resp3/write.hpp index edc68aa3..5d60f2a2 100644 --- a/include/aedis/resp3/write.hpp +++ b/include/aedis/resp3/write.hpp @@ -18,6 +18,11 @@ namespace aedis { namespace resp3 { +/** \file write.hpp + * + * Write utility functions. + */ + template std::size_t write( @@ -48,13 +53,14 @@ std::size_t write( return bytes_transferred; } -/* Asynchronously writes one or more requests on the stream. - */ -template +template< + class AsyncWriteStream, + class Queue +> struct write_some_op { AsyncWriteStream& stream; - std::queue& requests; - net::coroutine coro = net::coroutine(); + Queue& reqs; + net::coroutine coro_ = net::coroutine(); void operator()( @@ -62,48 +68,49 @@ struct write_some_op { boost::system::error_code const& ec = {}, std::size_t n = 0) { - reenter (coro) { + reenter (coro_) { do { - assert(!std::empty(requests)); - assert(!std::empty(requests.front().payload)); + assert(!std::empty(reqs)); + assert(!std::empty(reqs.front().payload)); yield async_write( stream, - net::buffer(requests.front().payload), + net::buffer(reqs.front().payload), std::move(self)); if (ec) break; - if (std::empty(requests.front().commands)) { - // We only pop when all commands in the pipeline have push - // responses like subscribe, otherwise, pop is done when the - // response arrives. - requests.pop(); - } - } while (!std::empty(requests) && std::empty(requests.front().commands)); + // Pops the request if no response is expected. + if (std::empty(reqs.front().commands)) + reqs.pop(); + + } while (!std::empty(reqs) && std::empty(reqs.front().commands)); self.complete(ec); } } }; +/** @brief Writes the some request from the queue in the stream. + */ template< class AsyncWriteStream, + class Queue, class CompletionToken = net::default_completion_token_t > auto async_write_some( AsyncWriteStream& stream, - std::queue& requests, + Queue& reqs, CompletionToken&& token = net::default_completion_token_t{}) { return net::async_compose< CompletionToken, void(boost::system::error_code)>( - write_some_op{stream, requests}, + write_some_op{stream, reqs}, token, stream); } diff --git a/tests/general.cpp b/tests/general.cpp index 4c4a02a1..96230dff 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -109,23 +109,24 @@ struct test_general_fill { net::awaitable test_general(net::ip::tcp::resolver::results_type const& res) { + std::queue requests; + requests.push({}); + requests.back().push(command::hello, 3); + test_general_fill filler; + filler(requests.back()); + auto ex = co_await this_coro::executor; net::ip::tcp::socket socket{ex}; co_await net::async_connect(socket, res, net::use_awaitable); - std::queue requests; - requests.push({}); - requests.back().push(command::hello, 3); - - test_general_fill filler; + co_await async_write(socket, requests.back(), net::use_awaitable); + std::string buffer; resp3::response resp; - resp3::stream stream{std::move(socket)}; - int push_counter = 0; for (;;) { resp.clear(); - co_await stream.async_consume(requests, resp, net::use_awaitable); + co_await resp3::async_read(socket, buffer, resp, net::use_awaitable); if (resp.get_type() == resp3::type::push) { switch (push_counter) { @@ -158,12 +159,11 @@ test_general(net::ip::tcp::resolver::results_type const& res) } auto const cmd = requests.front().commands.front(); + requests.front().commands.pop(); switch (cmd) { case command::hello: { - prepare_next(requests); - filler(requests.back()); } break; case command::multi: { @@ -519,7 +519,6 @@ net::awaitable test_set(net::ip::tcp::resolver::results_type const& results) { using namespace aedis; - auto ex = co_await this_coro::executor; // Tests whether the parser can handle payloads that contain the separator. test_bulk1[30] = '\r'; @@ -527,9 +526,6 @@ test_set(net::ip::tcp::resolver::results_type const& results) std::string test_bulk2 = "aaaaa"; - tcp_socket socket {ex}; - co_await async_connect(socket, results); - resp3::request p; p.push(command::hello, 3); p.push(command::flushall); @@ -541,6 +537,10 @@ test_set(net::ip::tcp::resolver::results_type const& results) p.push(command::get, "s"); p.push(command::quit); + auto ex = co_await this_coro::executor; + tcp_socket socket {ex}; + co_await async_connect(socket, results); + co_await async_write(socket, net::buffer(p.payload)); std::string buf; @@ -609,14 +609,6 @@ test_set(net::ip::tcp::resolver::results_type const& results) } } -struct test_handler { - void operator()(boost::system::error_code ec) const - { - if (ec) - std::cout << ec.message() << std::endl; - } -}; - net::awaitable test_simple_string() { using namespace aedis;