From a15e438e52a42484a232fe4f6c206693dbb7561d Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 25 Dec 2021 16:12:52 +0100 Subject: [PATCH] Async read returns the number of bytes consumed. --- INSTALL | 2 +- examples/client_base.hpp | 39 ++++++++++++------- examples/echo_server.cpp | 30 +++++++++++--- examples/intro.cpp | 18 +++++---- examples/key_expiration.cpp | 4 +- examples/response_queue.cpp | 4 +- include/aedis/resp3/detail/read_ops.hpp | 17 +++++--- .../aedis/resp3/detail/response_adapters.hpp | 19 ++++++--- include/aedis/resp3/read.hpp | 10 ++--- include/aedis/resp3/write.hpp | 3 ++ tests/general.cpp | 2 +- 11 files changed, 98 insertions(+), 50 deletions(-) diff --git a/INSTALL b/INSTALL index 63afdc8a..2e8cdfa5 100644 --- a/INSTALL +++ b/INSTALL @@ -1 +1 @@ -CC=/opt/gcc-10.2.0/bin/gcc-10.2.0 CXX=/opt/gcc-10.2.0/bin/g++-10.2.0 CXXFLAGS="-std=c++20 -fcoroutines -g" ./configure --with-boost=/opt/boost_1_78_0 --with-boost-libdir=/opt/boost_1_78_0/lib +CC=/opt/gcc-10.2.0/bin/gcc-10.2.0 CXX=/opt/gcc-10.2.0/bin/g++-10.2.0 CXXFLAGS="-std=c++20 -fcoroutines -g -Wall -Werror" ./configure --with-boost=/opt/boost_1_78_0 --with-boost-libdir=/opt/boost_1_78_0/lib diff --git a/examples/client_base.hpp b/examples/client_base.hpp index 4141a8c2..b4388e9f 100644 --- a/examples/client_base.hpp +++ b/examples/client_base.hpp @@ -17,23 +17,34 @@ using namespace aedis::net::experimental::awaitable_operators; namespace aedis { namespace resp3 { -/* Example: A general purpose redis client. - * - * This client supports many features. +/* Example: A general purpose redis client. + + This class is meant to be an example. Users are meant to derive + from this class and override + + 1. on_event. + 2. on_push. + + The ReponseId type is required to provide the get_command() member + function. */ -template -class client_base : public std::enable_shared_from_this> { +template +class client_base + : public std::enable_shared_from_this> { protected: // The response used for push types. - std::vector push_resp_; + std::vector push_resp_; + + // Hello response. + std::vector hello_; private: using tcp_socket = net::use_awaitable_t<>::as_default_on_t; - std::queue> srs_; + std::queue> srs_; tcp_socket socket_; - // A timer used to inform the write coroutine that it can write the next - // message in the output queue. + // Timer used to inform the write coroutine that it can write the + // next message in the output queue. net::steady_timer timer_; // A coroutine that keeps reading the socket. When a message @@ -91,11 +102,9 @@ private: co_await async_write(socket_, net::buffer(sr.request())); std::string buffer; - co_await async_read(socket_, buffer, adapt()); - // TODO: Set the information retrieved from hello. + co_await async_read(socket_, buffer, adapt(hello_)); } - // The connection manager. It keeps trying the reconnect to the // server when the connection is lost. net::awaitable connection_manager() @@ -125,7 +134,7 @@ private: * If true is returned the request in the front of the queue can be * sent to the server. See async_write_some. */ - bool prepare_next(std::queue>& reqs) + bool prepare_next(std::queue>& reqs) { if (std::empty(reqs)) { reqs.push({}); @@ -150,7 +159,7 @@ public: // Destructor. virtual ~client_base() { } - /* \brief Starts the client. + /* Starts the client. * * Stablishes a connection with the redis server and keeps waiting for messages to send. */ @@ -192,7 +201,7 @@ public: * * Override this function to receive events in your derived class. */ - virtual void on_event(QueueElem qe) {}; + virtual void on_event(ResponseId) {}; /* \brief Called when server push is received. * diff --git a/examples/echo_server.cpp b/examples/echo_server.cpp index 1e3a8310..43e696ff 100644 --- a/examples/echo_server.cpp +++ b/examples/echo_server.cpp @@ -12,21 +12,39 @@ using aedis::resp3::client_base; using aedis::resp3::response_traits; using aedis::resp3::node; +// Base class for user sessions. struct user_session_base { virtual ~user_session_base() {} virtual void on_event(command cmd) = 0; }; -struct queue_elem { +// struct to hold information that we need when the response to a +// command is received. See client_base.hpp for more details on the +// required fields in this struct. +struct response_id { + // The type of the adapter that should be used to deserialize the + // response. + using adapter_type = response_traits>::adapter_type; + + // The type of the session pointer. + using session_ptr = std::weak_ptr; + + // The redis command that was send in the request. command cmd = command::unknown; - response_traits>::adapter_type adapter; - std::weak_ptr session = std::shared_ptr{nullptr}; + + // The adapter. + adapter_type adapter; + + // The pointer to the session the request belong to. + session_ptr session = std::shared_ptr{nullptr}; + + // Required from client_base.hpp. auto get_command() const noexcept { return cmd; } }; -class my_redis_client : public client_base { +class my_redis_client : public client_base { private: - void on_event(queue_elem qe) override + void on_event(response_id qe) override { if (auto session = qe.session.lock()) { session->on_event(qe.cmd); @@ -37,7 +55,7 @@ private: public: my_redis_client(net::any_io_executor ex) - : client_base(ex) + : client_base(ex) {} }; diff --git a/examples/intro.cpp b/examples/intro.cpp index 60fb93f9..899f5cb0 100644 --- a/examples/intro.cpp +++ b/examples/intro.cpp @@ -37,6 +37,7 @@ net::awaitable ping() // Creates and sends the request. serializer sr; sr.push(command::hello, 3); + sr.push(command::flushall); sr.push(command::ping); sr.push(command::incr, "key"); sr.push(command::quit); @@ -44,21 +45,22 @@ net::awaitable ping() // Expected responses. int incr; - std::string ping, quit; + std::string ping; // Reads the responses. std::string buffer; - co_await async_read(socket, buffer); // hello (ignored) - co_await async_read(socket, buffer, adapt(ping)); - co_await async_read(socket, buffer, adapt(incr)); - co_await async_read(socket, buffer, adapt(quit)); + std::size_t n = 0; + n += co_await async_read(socket, buffer); // hello (ignored) + n += co_await async_read(socket, buffer); // flushall + n += co_await async_read(socket, buffer, adapt(ping)); + n += co_await async_read(socket, buffer, adapt(incr)); + n += co_await async_read(socket, buffer); // Print the responses. std::cout + << "Bytes read: " << n << "\n" << "ping: " << ping << "\n" - << "incr: " << incr << "\n" - << "quit: " << quit - << std::endl; + << "incr: " << incr << "\n"; } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/key_expiration.cpp b/examples/key_expiration.cpp index 24cf4a29..e5d6e1e7 100644 --- a/examples/key_expiration.cpp +++ b/examples/key_expiration.cpp @@ -22,7 +22,9 @@ namespace net = aedis::net; using net::async_write; using net::buffer; -/* When accessing a key that does not exist, for example due to +/* Shows how to deal with keys that may not exist. + + When accessing a key that does not exist, for example due to expiration, redis will return null. Aedis supports these usecases through std::optional. */ diff --git a/examples/response_queue.cpp b/examples/response_queue.cpp index 5c27d3bd..bcc3ca7b 100644 --- a/examples/response_queue.cpp +++ b/examples/response_queue.cpp @@ -22,7 +22,7 @@ using net::buffer; /* Processes the responses in a loop using the helper queue. - In most cases commands will be added dynamically in the request for + In most cases commands will be added dynamically to the request for example as users interact with the code. In order to process the responses asynchronously users have to keep a queue of the expected commands or use the one provided by the serializer class. @@ -40,7 +40,7 @@ net::awaitable ping() sr.push(command::quit); co_await async_write(socket, buffer(sr.request())); - // Expected responses (ignoring hello). + // Expected responses std::string ping, quit; // Reads the responses. diff --git a/include/aedis/resp3/detail/read_ops.hpp b/include/aedis/resp3/detail/read_ops.hpp index a5f16805..ccd1d472 100644 --- a/include/aedis/resp3/detail/read_ops.hpp +++ b/include/aedis/resp3/detail/read_ops.hpp @@ -17,6 +17,7 @@ namespace detail { // The parser supports up to 5 levels of nested structures. The first // element in the sizes stack is a sentinel and must be different from // 1. +// TODO: Use asio::coroutine. template < class AsyncReadStream, class Buffer, @@ -24,15 +25,18 @@ template < class parse_op { private: AsyncReadStream& stream_; - Buffer* buf_ = nullptr; + Buffer* buf_; parser parser_; - int start_ = 1; + std::size_t consumed_; + int start_; public: parse_op(AsyncReadStream& stream, Buffer* buf, ResponseAdapter adapter) : stream_ {stream} , buf_ {buf} , parser_ {adapter} + , consumed_{0} + , start_{1} { } template @@ -61,7 +65,7 @@ public: // last time. If it is, there is no need of initiating // another async op otherwise we have to read the // missing bytes. - if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) { + if (std::size(*buf_) < (parser_.bulk_length() + 2)) { start_ = 0; auto const s = std::ssize(*buf_); auto const l = parser_.bulk_length(); @@ -78,12 +82,13 @@ public: default: { if (ec) - return self.complete(ec); + return self.complete(ec, 0); n = parser_.advance(buf_->data(), n); buf_->erase(0, n); + consumed_ += n; if (parser_.done()) - return self.complete({}); + return self.complete({}, consumed_); } } } @@ -109,6 +114,8 @@ public: , boost::system::error_code ec = {} , std::size_t n = 0) { + boost::ignore_unused(n); + if (ec) { self.complete(ec, type::invalid); return; diff --git a/include/aedis/resp3/detail/response_adapters.hpp b/include/aedis/resp3/detail/response_adapters.hpp index 17070007..86729c12 100644 --- a/include/aedis/resp3/detail/response_adapters.hpp +++ b/include/aedis/resp3/detail/response_adapters.hpp @@ -122,6 +122,8 @@ public: std::size_t data_size) { assert(!is_aggregate(t)); + assert(aggregate_size == 1); + assert(depth == 0); from_string(*result_, data, data_size); } }; @@ -142,15 +144,17 @@ public: char const* data, std::size_t data_size) { - assert(!is_aggregate(t)); + assert(!is_aggregate(t)); + assert(aggregate_size == 1); + assert(depth == 0); - if (t == type::null) - return; + if (t == type::null) + return; - if (!result_->has_value()) - *result_ = T{}; + if (!result_->has_value()) + *result_ = T{}; - from_string(result_->value(), data, data_size); + from_string(result_->value(), data, data_size); } }; @@ -173,9 +177,12 @@ public: std::size_t data_size) { if (is_aggregate(t)) { + assert(depth == 0); auto const m = element_multiplicity(t); result_->resize(m * aggregate_size); } else { + assert(depth == 1); + assert(aggregate_size == 1); from_string(result_->at(i_), data, data_size); ++i_; } diff --git a/include/aedis/resp3/read.hpp b/include/aedis/resp3/read.hpp index fbd01f42..893185ec 100644 --- a/include/aedis/resp3/read.hpp +++ b/include/aedis/resp3/read.hpp @@ -91,10 +91,10 @@ read( return n; } -/** @brief Reads the response to a reads command. - * - * This function has to be called once for each command in the request until - * the whole request has been consumed. +/** @brief Reads the response to a Redis command. + + This function has to be called once for each command in the + request until the whole request has been consumed. */ template < class AsyncReadStream, @@ -111,7 +111,7 @@ auto async_read( { return net::async_compose < CompletionToken - , void(boost::system::error_code) + , void(boost::system::error_code, std::size_t) >(detail::parse_op {stream, &buffer, adapter}, token, stream); diff --git a/include/aedis/resp3/write.hpp b/include/aedis/resp3/write.hpp index 812c235a..4ee9cefb 100644 --- a/include/aedis/resp3/write.hpp +++ b/include/aedis/resp3/write.hpp @@ -18,6 +18,7 @@ namespace aedis { namespace resp3 { +// TODO: return the number of bytes written. template< class AsyncWriteStream, class Queue @@ -33,6 +34,8 @@ struct write_some_op { boost::system::error_code const& ec = {}, std::size_t n = 0) { + boost::ignore_unused(n); + reenter (coro_) { do { assert(!std::empty(reqs)); diff --git a/tests/general.cpp b/tests/general.cpp index 399c4f05..8f7e67e3 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -1009,7 +1009,7 @@ net::awaitable test_streamed_string() // //} //} -int main(int argc, char* argv[]) +int main() { net::io_context ioc {1}; tcp::resolver resv(ioc);