From 7fd39496dc2f745c0a5be12a24acaa11f7f4903c Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 13 Dec 2020 09:52:35 +0100 Subject: [PATCH] Make progresses responses. --- examples/async.cpp | 43 +++++++++++++++++- include/aedis/aedis.hpp | 97 ++++++++++++++++++++++++++++++++++------- 2 files changed, 123 insertions(+), 17 deletions(-) diff --git a/examples/async.cpp b/examples/async.cpp index 6aef321c..3ad71615 100644 --- a/examples/async.cpp +++ b/examples/async.cpp @@ -71,11 +71,50 @@ net::awaitable example2() } } +net::awaitable example3() +{ + auto ex = co_await this_coro::executor; + + tcp::resolver resv(ex); + auto const r = resv.resolve("127.0.0.1", "6379"); + + tcp_socket socket {ex}; + co_await async_connect(socket, r); + + resp::pipeline p; + p.flushall(); + p.rpush("key", {1, 2, 3}); + p.lrange("key"); + p.lrange("key"); + p.lrange("key"); + + co_await async_write(socket, buffer(p.payload)); + + resp::buffer buffer; + + resp::response res1; + co_await resp::async_read(socket, buffer, res1); + co_await resp::async_read(socket, buffer, res1); + + resp::response_list res2; + co_await resp::async_read(socket, buffer, res2); + resp::print(res2.result); + + resp::response_list res3; + co_await resp::async_read(socket, buffer, res3); + resp::print(res3.result); + + resp::response_list res4; + co_await resp::async_read(socket, buffer, res4); + resp::print(res4.result); +} + int main() { io_context ioc {1}; - co_spawn(ioc, example1(), detached); - co_spawn(ioc, example2(), detached); + //co_spawn(ioc, example1(), detached); + //co_spawn(ioc, example2(), detached); + co_spawn(ioc, example3(), detached); ioc.run(); } diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index d8241aa4..e2375fa7 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include @@ -41,6 +42,61 @@ namespace resp using buffer = std::string; +struct response_throw { + virtual void select_array(int n) { throw std::runtime_error("select_array: Has not been overridden."); } + virtual void select_push(int n) { throw std::runtime_error("select_push: Has not been overridden."); } + virtual void select_set(int n) { throw std::runtime_error("select_set: Has not been overridden."); } + virtual void select_map(int n) { throw std::runtime_error("select_map: Has not been overridden."); } + virtual void select_attribute(int n) { throw std::runtime_error("select_attribute: Has not been overridden."); } + + virtual void on_simple_string(std::string_view s) { throw std::runtime_error("on_simple_string: Has not been overridden."); } + virtual void on_simple_error(std::string_view s) { throw std::runtime_error("on_simple_error: Has not been overridden."); } + virtual void on_number(std::string_view s) { throw std::runtime_error("on_number: Has not been overridden."); } + virtual void on_double(std::string_view s) { throw std::runtime_error("on_double: Has not been overridden."); } + virtual void on_bool(std::string_view s) { throw std::runtime_error("on_bool: Has not been overridden."); } + virtual void on_big_number(std::string_view s) { throw std::runtime_error("on_big_number: Has not been overridden."); } + virtual void on_null() { throw std::runtime_error("on_null: Has not been overridden."); } + virtual void on_blob_error(std::string_view s = {}) { throw std::runtime_error("on_blob_error: Has not been overridden."); } + virtual void on_verbatim_string(std::string_view s = {}) { throw std::runtime_error("on_verbatim_string: Has not been overridden."); } + virtual void on_blob_string(std::string_view s = {}) { throw std::runtime_error("on_blob_string: Has not been overridden."); } + virtual void on_streamed_string_part(std::string_view s = {}) { throw std::runtime_error("on_streamed_string_part: Has not been overridden."); } +}; + +template +T to_number(std::string_view s) +{ + T n; + auto r = std::from_chars(s.data(), + s.data() + s.size(), + n); + + if (r.ec == std::errc::invalid_argument) + throw std::runtime_error("Unable to convert"); + + return n; +} + +template +struct converter { + template ::value, T>::type> + static T apply(std::string_view s, U = {}) + { return to_number(s); } +}; + +template <> +struct converter { + static std::string apply(std::string_view s) + { return std::string {s}; } +}; + +template > +struct response_list : response_throw { + std::list result; + void select_array(int n) override { } + void on_blob_string(std::string_view s) override + { result.push_back(converter::apply(s)); } +}; + // General purpose response. Copies the string reponses in the result // vector. struct response_vector { @@ -87,7 +143,16 @@ std::size_t length(char const* p) return len; } -void print(std::vector const& v) +template +void print(std::vector const& v) +{ + for (auto const& o : v) + std::cout << o << " "; + std::cout << std::endl; +} + +template +void print(std::list const& v) { for (auto const& o : v) std::cout << o << " "; @@ -109,6 +174,7 @@ void print_command_raw(std::string const& data, int n) } } +template class parser { public: enum class bulk @@ -120,7 +186,7 @@ public: }; private: - resp::response* res_ = nullptr; + Response* res_ = nullptr; int depth_ = 0; int sizes_[6] = {2, 1, 1, 1, 1, 1}; // Streaming will require a bigger integer. bulk bulk_ = bulk::none; @@ -233,7 +299,7 @@ private: } public: - parser(resp::response* res) + parser(Response* res) : res_ {res} {} @@ -287,16 +353,16 @@ public: // The parser supports up to 5 levels of nested structures. The first // element in the sizes stack is a sentinel and must be different from // 1. -template +template class parse_op { private: AsyncReadStream& stream_; resp::buffer* buf_ = nullptr; - parser parser_; + parser parser_; int start_ = 1; public: - parse_op(AsyncReadStream& stream, resp::buffer* buf, resp::response* res) + parse_op(AsyncReadStream& stream, resp::buffer* buf, Response* res) : stream_ {stream} , buf_ {buf} , parser_ {res} @@ -309,7 +375,7 @@ public: { switch (start_) { for (;;) { - if (parser_.bulk() == parser::bulk::none) { + if (parser_.bulk() == parser::bulk::none) { case 1: start_ = 0; net::async_read_until( @@ -359,18 +425,18 @@ public: } }; -template +template auto read( SyncReadStream& stream, resp::buffer& buf, - resp::response& res, + Response& res, boost::system::error_code& ec) { - parser p {&res}; + parser p {&res}; std::size_t n = 0; goto start; do { - if (p.bulk() == parser::bulk::none) { + if (p.bulk() == parser::bulk::none) { start: n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec); if (ec || n < 3) @@ -394,12 +460,12 @@ start: return n; } -template +template std::size_t read( SyncReadStream& stream, resp::buffer& buf, - resp::response& res) + Response& res) { boost::system::error_code ec; auto const n = read(stream, buf, res, ec); @@ -412,20 +478,21 @@ read( template < class AsyncReadStream, + class Response, class CompletionToken = net::default_completion_token_t > auto async_read( AsyncReadStream& stream, resp::buffer& buffer, - resp::response& res, + Response& res, CompletionToken&& token = net::default_completion_token_t{}) { return net::async_compose < CompletionToken , void(boost::system::error_code) - >(parse_op {stream, &buffer, &res}, + >(parse_op {stream, &buffer, &res}, token, stream); }