From 05eb2191d5b3c0cc4fb89460e7fc950c063bae2b Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Mon, 11 Jan 2021 13:18:45 +0100 Subject: [PATCH] Changes: - Small fixes. - Adds function async_read_type. - Lots of simplifications. - First steps with new easy design. - New design supporting transactions. --- examples/async_basic.cpp | 25 +-- examples/async_events.cpp | 4 +- examples/async_pubsub.cpp | 177 +++++++++++++++--- include/aedis/read.hpp | 127 +++++++++++++ include/aedis/request.hpp | 70 ++++++- include/aedis/response.hpp | 361 ++++++++++++++----------------------- tests/general.cpp | 16 +- 7 files changed, 501 insertions(+), 279 deletions(-) diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index d3b56b15..37b84de4 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -24,9 +24,11 @@ net::awaitable example() auto ex = co_await this_coro::executor; resp::request p; + p.multi(); p.hello(); p.rpush("list", {1, 2, 3}); p.lrange("list"); + p.exec(); p.quit(); tcp::resolver resv(ex); @@ -35,24 +37,11 @@ net::awaitable example() co_await net::async_write(socket, net::buffer(p.payload)); std::string buffer; - resp::response_flat_map hello; - co_await resp::async_read(socket, buffer, hello); - - resp::response_number rpush; - co_await resp::async_read(socket, buffer, rpush); - std::cout << rpush.result << std::endl; - - resp::response_list lrange; - co_await resp::async_read(socket, buffer, lrange); - print(lrange.result); - - resp::response_simple_string quit; - co_await resp::async_read(socket, buffer, quit); - std::cout << quit.result << std::endl; - - resp::response_ignore eof; - co_await resp::async_read(socket, buffer, eof); - + for (;;) { + resp::response_array hello; + co_await resp::async_read(socket, buffer, hello); + print(hello.result); + } } catch (std::exception const& e) { std::cerr << e.what() << std::endl; } diff --git a/examples/async_events.cpp b/examples/async_events.cpp index 7cf4c168..ace856c2 100644 --- a/examples/async_events.cpp +++ b/examples/async_events.cpp @@ -42,13 +42,13 @@ net::awaitable example() { resp::response_list res; co_await resp::async_read(socket, buffer, res); - print(res.result); + print(res.result, "Interesting1"); } break; case myevents::interesting2: { resp::response_set res; co_await resp::async_read(socket, buffer, res); - print(res.result); + print(res.result, "Interesting2"); } break; default: { diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp index a3219343..30ae10a4 100644 --- a/examples/async_pubsub.cpp +++ b/examples/async_pubsub.cpp @@ -8,6 +8,8 @@ #include #include +#include + /* Implements a coroutine that writes commands in interval and one the * reads the commands. */ @@ -20,27 +22,59 @@ using tcp = net::ip::tcp; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using stimer = net::use_awaitable_t<>::as_default_on_t; -net::awaitable publisher(tcp_socket& socket) +struct receiver { + void on_hello(std::vector v) + { std::cout << "hello" << std::endl; } + void on_lrange(std::vector v) + { std::cout << "lrange " << std::size(v) << std::endl; } + void on_subscribe(std::string& v) + { std::cout << "subscribe" << std::endl; } + void on_ping(std::string v) + { std::cout << "ping" << std::endl; } + void on_set(std::string v) + { std::cout << "set " << v << std::endl; } + void on_flushall(std::string v) + { std::cout << "flushall " << v << std::endl; } + void on_get(std::string v) + { std::cout << "get " << v << std::endl; } + void on_quit(std::string v) + { std::cout << "quit " << v << std::endl; } + void on_rpush(int v) + { std::cout << "rpush " << v << std::endl; } + void on_publish(int v) + { std::cout << "publish" << std::endl; } + void on_push(std::vector v) + { std::cout << "push" << std::endl; } + void on_del(int v) + { std::cout << "del" << std::endl; } +}; + +net::awaitable +publisher(tcp_socket& socket, resp::request& req) { auto ex = co_await this_coro::executor; try { - resp::request req; req.hello(); + req.flushall(); req.subscribe("channel"); req.subscribe("__keyspace@0__:user:*"); + req.ping(); + req.set("aaaa", {std::to_string(1)}); + req.get("aaaa"); + req.del("aaaa"); + req.rpush("user:Marcelo", {1, 2, 3}); + req.lrange("user:Marcelo"); + req.publish("channel", "Some message"); + req.multi(); + req.lrange("user:Marcelo"); + req.exec(); + req.set("aaaa", {std::to_string(2)}); + req.get("aaaa"); + req.quit(); - std::string buffer; - for (auto i = 0;; ++i) { - req.ping(); - req.rpush("user:Marcelo", {i}); - req.publish("channel", "Some message"); - - co_await async_write(socket, req); - req.clear(); - - stimer timer(ex, std::chrono::seconds{2}); - co_await timer.async_wait(); - } + co_await async_write(socket, req); + stimer timer(ex, std::chrono::seconds{2}); + co_await timer.async_wait(); } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; } @@ -54,16 +88,117 @@ net::awaitable subscriber() auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_spawn(ex, publisher(socket), net::detached); + resp::request req; + co_spawn(ex, publisher(socket, req), net::detached); std::string buffer; + receiver recv; + resp::responses resps; for (;;) { - resp::response_array res; - co_await resp::async_read(socket, buffer, res); - if (res.is_push()) - print(res.push().value, "Push"); - else - print(res.result, "Message"); + resp::type type; + co_await async_read_type(socket, buffer, type); + auto const cmd = req.events.front().first; + + // The next two ifs are used to deal with pipelines. + auto const is_multi = cmd == resp::command::multi; + auto const is_exec = cmd == resp::command::exec; + auto const trans_empty = std::empty(resps.trans); + if (is_multi || (!trans_empty && !is_exec)) { + auto const* res = cmd == resp::command::multi ? "OK" : "QUEUED"; + co_await resp::async_read(socket, buffer, resps.blob_string); + assert(resps.blob_string.result == res); + resps.trans.push(req.events.front().first); + req.events.pop(); + continue; + } + + if (cmd == resp::command::exec) { + co_await resp::async_read(socket, buffer, resps.depth1); + assert(resps.trans.front() == resp::command::multi); + resps.trans.pop(); + for (auto i = 0; i < std::ssize(resps.trans); ++i) { + switch (resps.trans.front()) { + case resp::command::lrange: recv.on_lrange(std::move(resps.depth1.at(i))); break; + default: {assert(false);} + } + resps.trans.pop(); + } + resps.depth1.clear(); + assert(std::empty(resps.trans)); + req.events.pop(); // exec + continue; + } + + switch (type) { + case resp::type::push: + { + co_await resp::async_read(socket, buffer, resps.push); + recv.on_push(std::move(resps.push.result)); + resps.push.result.clear(); + } break; + case resp::type::simple_string: + { + co_await resp::async_read(socket, buffer, resps.simple_string); + switch (cmd) { + case resp::command::set: recv.on_set(std::move(resps.simple_string.result)); break; + case resp::command::ping: recv.on_ping(std::move(resps.simple_string.result)); break; + case resp::command::flushall: recv.on_flushall(std::move(resps.simple_string.result)); break; + case resp::command::quit: recv.on_quit(std::move(resps.simple_string.result)); break; + default: {assert(false);} + } + resps.simple_string.result.clear(); + } break; + case resp::type::blob_string: + { + co_await resp::async_read(socket, buffer, resps.blob_string); + switch (cmd) { + case resp::command::get: recv.on_get(std::move(resps.blob_string.result)); break; + default: { assert(false); } + } + resps.blob_string.result.clear(); + } break; + case resp::type::map: + { + co_await resp::async_read(socket, buffer, resps.map); + switch (cmd) { + case resp::command::hello: recv.on_hello(std::move(resps.map.result)); break; + default: {assert(false);} + } + resps.map.result = {}; + } break; + case resp::type::array: + { + co_await resp::async_read(socket, buffer, resps.array); + switch (cmd) { + case resp::command::lrange: recv.on_lrange(std::move(resps.array.result)); break; + default: { assert(false); } + } + resps.array.result.clear(); + } break; + case resp::type::set: + { + co_await resp::async_read(socket, buffer, resps.set); + resps.set.result.clear(); + } break; + case resp::type::number: + { + co_await resp::async_read(socket, buffer, resps.number); + switch (cmd) { + case resp::command::rpush: recv.on_rpush(resps.number.result); break; + case resp::command::publish: recv.on_publish(resps.number.result); break; + case resp::command::del: recv.on_del(resps.number.result); break; + default: { assert(false); } + } + } break; + default: { assert(false); + } + } + + if (type != resp::type::push) + req.events.pop(); + + if (std::empty(req.events)) + req.clear(); } } catch (std::exception const& e) { std::cout << e.what() << std::endl; diff --git a/include/aedis/read.hpp b/include/aedis/read.hpp index 17456c5d..e2c2d19e 100644 --- a/include/aedis/read.hpp +++ b/include/aedis/read.hpp @@ -224,6 +224,7 @@ public: { return bulk_length_; } }; +inline void print_command_raw(std::string const& data, int n) { for (int i = 0; i < n; ++i) { @@ -393,5 +394,131 @@ auto async_read( stream); } +enum class type +{ array +, push +, set +, map +, attribute +, simple_string +, simple_error +, number +, double_type +, boolean +, big_number +, null +, blob_error +, verbatim_string +, blob_string +, streamed_string_part +, invalid +}; + +#define EXPAND_TYPE_CASE(x) case type::x: return #x + +inline +auto to_string(type t) +{ + switch (t) { + EXPAND_TYPE_CASE(array); + EXPAND_TYPE_CASE(push); + EXPAND_TYPE_CASE(set); + EXPAND_TYPE_CASE(map); + EXPAND_TYPE_CASE(attribute); + EXPAND_TYPE_CASE(simple_string); + EXPAND_TYPE_CASE(simple_error); + EXPAND_TYPE_CASE(number); + EXPAND_TYPE_CASE(double_type); + EXPAND_TYPE_CASE(boolean); + EXPAND_TYPE_CASE(big_number); + EXPAND_TYPE_CASE(null); + EXPAND_TYPE_CASE(blob_error); + EXPAND_TYPE_CASE(verbatim_string); + EXPAND_TYPE_CASE(blob_string); + EXPAND_TYPE_CASE(streamed_string_part); + EXPAND_TYPE_CASE(invalid); + default: assert(false); + } +} + +inline +auto to_type(char c) +{ + switch (c) { + case '!': return type::blob_error; + case '=': return type::verbatim_string; + case '$': return type::blob_string; + case ';': return type::streamed_string_part; + case '-': return type::simple_error; + case ':': return type::number; + case ',': return type::double_type; + case '#': return type::boolean; + case '(': return type::big_number; + case '+': return type::simple_string; + case '_': return type::null; + case '>': return type::push; + case '~': return type::set; + case '*': return type::array; + case '|': return type::attribute; + case '%': return type::map; + default: return type::invalid; + } +} + +template < + class AsyncReadStream, + class Storage> +class type_op { +private: + AsyncReadStream& stream_; + Storage* buf_ = nullptr; + type* t_; + +public: + type_op(AsyncReadStream& stream, Storage* buf, type* t) + : stream_ {stream} + , buf_ {buf} + , t_ {t} + { } + + template + void operator()( Self& self + , boost::system::error_code ec = {} + , std::size_t n = 0) + { + if (std::empty(*buf_)) { + net::async_read_until( + stream_, + net::dynamic_buffer(*buf_), + "\r\n", + std::move(self)); + } else { + *t_ = to_type(buf_->front()); + return self.complete(ec); + } + } +}; + +template < + class AsyncReadStream, + class Storage, + class CompletionToken = + net::default_completion_token_t + > +auto async_read_type( + AsyncReadStream& stream, + Storage& buffer, + type& t, + CompletionToken&& token = + net::default_completion_token_t{}) +{ + return net::async_compose + < CompletionToken + , void(boost::system::error_code) + >(type_op {stream, &buffer, &t}, + token, + stream); +} + } // resp } // aedis diff --git a/include/aedis/request.hpp b/include/aedis/request.hpp index 38b3714d..8ef126bb 100644 --- a/include/aedis/request.hpp +++ b/include/aedis/request.hpp @@ -141,7 +141,7 @@ enum class command , exec , expire , flushall -, get +, get // 10 , hello , hget , hgetall @@ -151,7 +151,7 @@ enum class command , hmget , hset , hvals -, incr +, incr // 20 , keys , llen , lpop @@ -161,7 +161,7 @@ enum class command , multi , ping , psubscribe -, publish +, publish // 30 , quit , role , rpush @@ -171,13 +171,68 @@ enum class command , set , smembers , subscribe -, unsubscribe +, unsubscribe // 40 , zadd , zrange , zrangebyscore , zremrangebyscore }; +#define EXPAND_COMMAND_CASE(x) case command::x: return #x + +inline +auto const* to_string(command c) +{ + switch (c) { + EXPAND_COMMAND_CASE(append); + EXPAND_COMMAND_CASE(auth); + EXPAND_COMMAND_CASE(bgrewriteaof); + EXPAND_COMMAND_CASE(bgsave); + EXPAND_COMMAND_CASE(bitcount); + EXPAND_COMMAND_CASE(client); + EXPAND_COMMAND_CASE(del); + EXPAND_COMMAND_CASE(exec); + EXPAND_COMMAND_CASE(expire); + EXPAND_COMMAND_CASE(flushall); + EXPAND_COMMAND_CASE(get); + EXPAND_COMMAND_CASE(hello); + EXPAND_COMMAND_CASE(hget); + EXPAND_COMMAND_CASE(hgetall); + EXPAND_COMMAND_CASE(hincrby); + EXPAND_COMMAND_CASE(hkeys); + EXPAND_COMMAND_CASE(hlen); + EXPAND_COMMAND_CASE(hmget); + EXPAND_COMMAND_CASE(hset); + EXPAND_COMMAND_CASE(hvals); + EXPAND_COMMAND_CASE(incr); + EXPAND_COMMAND_CASE(keys); + EXPAND_COMMAND_CASE(llen); + EXPAND_COMMAND_CASE(lpop); + EXPAND_COMMAND_CASE(lpush); + EXPAND_COMMAND_CASE(lrange); + EXPAND_COMMAND_CASE(ltrim); + EXPAND_COMMAND_CASE(multi); + EXPAND_COMMAND_CASE(ping); + EXPAND_COMMAND_CASE(psubscribe); + EXPAND_COMMAND_CASE(publish); + EXPAND_COMMAND_CASE(quit); + EXPAND_COMMAND_CASE(role); + EXPAND_COMMAND_CASE(rpush); + EXPAND_COMMAND_CASE(sadd); + EXPAND_COMMAND_CASE(scard); + EXPAND_COMMAND_CASE(sentinel); + EXPAND_COMMAND_CASE(set); + EXPAND_COMMAND_CASE(smembers); + EXPAND_COMMAND_CASE(subscribe); + EXPAND_COMMAND_CASE(unsubscribe); + EXPAND_COMMAND_CASE(zadd); + EXPAND_COMMAND_CASE(zrange); + EXPAND_COMMAND_CASE(zrangebyscore); + EXPAND_COMMAND_CASE(zremrangebyscore); + default: assert(false); + } +} + enum class event {ignore}; // TODO: Make the write functions friend of this class and make the @@ -189,6 +244,7 @@ public: std::queue> events; public: + bool empty() const noexcept { return std::empty(payload); }; void clear() { payload.clear(); @@ -273,7 +329,8 @@ public: Event e = Event::ignore) { resp::assemble(payload, "SUBSCRIBE", key); - events.push({command::subscribe, e}); + // It looks like in resp3 there is not response for subscribe. + //events.push({command::subscribe, e}); } void @@ -531,7 +588,8 @@ public: std::string_view value, Event e = Event::ignore) { - auto par = {std::to_string(score), value}; + std::initializer_list par = + {std::to_string(score), value}; resp::assemble(payload, "ZADD", {key}, std::cbegin(par), std::cend(par)); events.push({command::zadd, e}); } diff --git a/include/aedis/response.hpp b/include/aedis/response.hpp index 1925bad1..c907d7ed 100644 --- a/include/aedis/response.hpp +++ b/include/aedis/response.hpp @@ -45,6 +45,7 @@ from_string_view(std::string_view s, T& n) throw std::runtime_error("from_chars: Unable to convert"); } +inline void from_string_view(std::string_view s, std::string& r) { r = s; } @@ -71,10 +72,45 @@ struct response_ignore { ~response_ignore() {} }; -enum class error -{ simple_error -, blob_error -, none +// To receive transactions with responses the are not recursive +// themselves. +class response_depth1 { +private: + int i_ = 0; + std::vector> resps_; + void add_element() + { + if (++i_ > 1) + resps_.push_back({}); + } + + void add(std::string_view s) { resps_.back().push_back(std::string{s}); } + +public: + void clear() + { resps_.clear(); } + + auto& at(int i) { return resps_.at(i); } + auto const& at(int i) const { return resps_.at(i); } + + void pop() {} + void select_array(int n) {add_element();} + void select_push(int n) {add_element();} + void select_set(int n) {add_element();} + void select_map(int n) {add_element();} + void select_attribute(int n) {add_element();} + + void on_simple_string(std::string_view s) {add(s);} + void on_simple_error(std::string_view s) {add(s);} + void on_number(std::string_view s) {add(s);} + void on_double(std::string_view s) {add(s);} + void on_bool(std::string_view s) {add(s);} + void on_big_number(std::string_view s) {add(s);} + void on_null() {add({});} + void on_blob_error(std::string_view s = {}) {add(s);} + void on_verbatim_string(std::string_view s = {}) {add(s);} + void on_blob_string(std::string_view s = {}) {add(s);} + void on_streamed_string_part(std::string_view s = {}) {add(s);} }; enum class aggregate_type @@ -86,48 +122,23 @@ enum class aggregate_type , none }; -class push { -private: - void add(std::string_view s) - { - value.push_back({}); - from_string_view(s, value.back()); - } - -public: - void on_simple_string(std::string_view s) { add(s);} - void on_number(std::string_view s) { add(s);} - void on_double(std::string_view s) { add(s);} - void on_bool(std::string_view s) { add(s);} - void on_big_number(std::string_view s) { add(s);} - void on_verbatim_string(std::string_view s) { add(s);} - void on_blob_string(std::string_view s) { add(s);} - - std::vector value; -}; - -template class response_base { -public: - using push_type = Push; - private: - bool is_null_ = false; - error error_ = error::none; - std::string error_msg_; - bool is_push_ = false; - push_type push_; - - // The first element is the sentinel. - std::vector aggregates_ {aggregate_type::attribute}; + // TODO: Use the type enum in read.hpp and a static_array. The + // size of the array must be the same as that of the parser. + std::vector aggregates_; protected: virtual void on_simple_string_impl(std::string_view s) { throw std::runtime_error("on_simple_string_impl: Has not been overridden."); } + virtual void on_simple_error_impl(std::string_view s) + { throw std::runtime_error("on_simple_error_impl: Has not been overridden."); } virtual void on_number_impl(std::string_view s) { throw std::runtime_error("on_number_impl: Has not been overridden."); } virtual void on_double_impl(std::string_view s) { throw std::runtime_error("on_double_impl: Has not been overridden."); } + virtual void on_null_impl() + { throw std::runtime_error("on_null_impl: Has not been overridden."); } virtual void on_bool_impl(std::string_view s) { throw std::runtime_error("on_bool_impl: Has not been overridden."); } virtual void on_big_number_impl(std::string_view s) @@ -136,6 +147,10 @@ protected: { throw std::runtime_error("on_verbatim_string_impl: Has not been overridden."); } virtual void on_blob_string_impl(std::string_view s = {}) { throw std::runtime_error("on_blob_string_impl: Has not been overridden."); } + virtual void on_blob_error_impl(std::string_view s = {}) + { throw std::runtime_error("on_blob_error_impl: Has not been overridden."); } + virtual void on_streamed_string_part_impl(std::string_view s = {}) + { throw std::runtime_error("on_streamed_string_part: Has not been overridden."); } virtual void select_array_impl(int n) { throw std::runtime_error("select_array_impl: Has not been overridden."); } virtual void select_set_impl(int n) @@ -146,144 +161,29 @@ protected: { throw std::runtime_error("select_push_impl: Has not been overridden."); } public: - auto is_error() const noexcept {return error_ != error::none;} - auto get_error() const noexcept {return error_;} - auto const& error_message() const noexcept {return error_msg_;} - auto is_push() const noexcept {return is_push_;} - auto& push() noexcept {return push_;} - auto const& push() const noexcept {return push_;} - - void pop() - { aggregates_.pop_back(); } - - void on_simple_error(std::string_view s) - { - error_ = error::simple_error; - error_msg_ = s; - } - - void on_blob_error(std::string_view s = {}) - { - error_ = error::blob_error; - error_msg_ = s; - } - - void on_null() {is_null_ = true; } - - auto is_attribute() const noexcept - { - auto i = std::ssize(aggregates_) - 1; - while (aggregates_[i] != aggregate_type::attribute) - --i; - - return i != 0; - } - - void select_attribute(int n) - { - aggregates_.push_back(aggregate_type::attribute); - throw std::runtime_error("select_attribute: Has not been overridden."); - } - - void select_push(int n) - { - is_push_ = true; - aggregates_.push_back(aggregate_type::push); - } - - void select_array(int n) - { - aggregates_.push_back(aggregate_type::array); - } - - void select_set(int n) - { - aggregates_.push_back(aggregate_type::set); - } - - void select_map(int n) - { - aggregates_.push_back(aggregate_type::map); - } - - void on_simple_string(std::string_view s) - { - if (is_push_) { - push_.on_simple_string(s); - return; - } - - on_simple_string_impl(s); - } - - void on_number(std::string_view s) - { - if (is_push_) { - push_.on_number(s); - return; - } - - on_number_impl(s); - } - - void on_double(std::string_view s) - { - if (is_push_) { - push_.on_double(s); - return; - } - - on_double_impl(s); - } - - void on_bool(std::string_view s) - { - if (is_push_) { - push_.on_bool(s); - return; - } - - on_bool_impl(s); - } - - void on_big_number(std::string_view s) - { - if (is_push_) { - push_.on_big_number(s); - return; - } - - on_big_number_impl(s); - } - - void on_verbatim_string(std::string_view s = {}) - { - if (is_push_) { - push_.on_verbatim_string(s); - return; - } - - on_verbatim_string_impl(s); - } - - void on_blob_string(std::string_view s = {}) - { - if (is_push_) { - push_.on_blob_string(s); - return; - } - - on_blob_string_impl(s); - } - - virtual void on_streamed_string_part(std::string_view s = {}) - { throw std::runtime_error("on_streamed_string_part: Has not been overridden."); } + void pop() {aggregates_.pop_back();} + void select_attribute(int n) { aggregates_.push_back(aggregate_type::attribute); } + void select_push(int n) { aggregates_.push_back(aggregate_type::push); } + void select_array(int n) { aggregates_.push_back(aggregate_type::array); } + void select_set(int n) { aggregates_.push_back(aggregate_type::set); } + void select_map(int n) { aggregates_.push_back(aggregate_type::map); } + void on_simple_error(std::string_view s) { on_simple_error_impl(s); } + void on_blob_error(std::string_view s = {}) { on_blob_error_impl(s); } + void on_null() {on_null_impl(); } + void on_simple_string(std::string_view s) { on_simple_string_impl(s); } + void on_number(std::string_view s) { on_number_impl(s); } + void on_double(std::string_view s) { on_double_impl(s); } + void on_bool(std::string_view s) { on_bool_impl(s); } + void on_big_number(std::string_view s) { on_big_number_impl(s); } + void on_verbatim_string(std::string_view s = {}) { on_verbatim_string_impl(s); } + void on_blob_string(std::string_view s = {}) { on_blob_string_impl(s); } + void on_streamed_string_part(std::string_view s = {}) { on_streamed_string_part_impl(s); } virtual ~response_base() {} }; -template -class response_number : public response_base { +template +class response_number : public response_base { static_assert(std::is_integral::value); private: void on_number_impl(std::string_view s) override @@ -296,13 +196,20 @@ public: template< class CharT = char, class Traits = std::char_traits, - class Allocator = std::allocator, - class Push = push> -class response_blob_string : public response_base { + class Allocator = std::allocator> +class response_blob_string : public response_base { private: - void on_blob_string_impl(std::string_view s) override + void add(std::string_view s) { from_string_view(s, result); } + void on_blob_string_impl(std::string_view s) override + { add(s); } + void on_blob_error_impl(std::string_view s) override + { add(s); } + void on_simple_string_impl(std::string_view s) override + { add(s); } + void on_simple_error_impl(std::string_view s) override + { add(s); } public: std::basic_string result; }; @@ -310,24 +217,29 @@ public: template< class CharT = char, class Traits = std::char_traits, - class Allocator = std::allocator, - class Push = push> -class response_simple_string : public response_base { + class Allocator = std::allocator + > +class response_simple_string : public response_base { private: - void on_simple_string_impl(std::string_view s) override + void add(std::string_view s) { from_string_view(s, result); } + void on_simple_string_impl(std::string_view s) override + { add(s); } + void on_simple_error_impl(std::string_view s) override + { add(s); } + public: std::basic_string result; }; // Big number use strings at the moment as the underlying storage. -template< +template < class CharT = char, class Traits = std::char_traits, - class Allocator = std::allocator, - class Push = push> -class response_big_number : public response_base { + class Allocator = std::allocator + > +class response_big_number : public response_base { private: void on_big_number_impl(std::string_view s) override { from_string_view(s, result); } @@ -337,12 +249,12 @@ public: }; // TODO: Use a double instead of string. -template< +template < class CharT = char, class Traits = std::char_traits, - class Allocator = std::allocator, - class Push = push> -class response_double : public response_base { + class Allocator = std::allocator + > +class response_double : public response_base { private: void on_double_impl(std::string_view s) override { from_string_view(s, result); } @@ -353,9 +265,8 @@ public: template < class T, - class Allocator = std::allocator, - class Push = push> -class response_list : public response_base { + class Allocator = std::allocator> +class response_list : public response_base { private: void on_blob_string_impl(std::string_view s) override { @@ -373,9 +284,9 @@ public: template< class CharT = char, class Traits = std::char_traits, - class Allocator = std::allocator, - class Push = push> -class response_verbatim_string : public response_base { + class Allocator = std::allocator + > +class response_verbatim_string : public response_base { private: void on_verbatim_string_impl(std::string_view s) override { from_string_view(s, result); } @@ -386,20 +297,22 @@ public: template< class CharT = char, class Traits = std::char_traits, - class Allocator = std::allocator, - class Push = push> -struct response_streamed_string : response_base { - std::basic_string result; - void on_streamed_string_part(std::string_view s) override + class Allocator = std::allocator + > +class response_streamed_string : public response_base { +private: + void on_streamed_string_part_impl(std::string_view s) override { result += s; } +public: + std::basic_string result; }; -template< +template < class Key, class Compare = std::less, - class Allocator = std::allocator, - class Push = push> -class response_set : public response_base { + class Allocator = std::allocator + > +class response_set : public response_base { private: void add(std::string_view s) { @@ -416,8 +329,7 @@ public: std::set result; }; -template -class response_bool : public response_base { +class response_bool : public response_base { private: void on_bool_impl(std::string_view s) override { @@ -436,9 +348,9 @@ public: template< class Key, class Compare = std::less, - class Allocator = std::allocator, - class Push = push> -class response_unordered_set : public response_base { + class Allocator = std::allocator + > +class response_unordered_set : public response_base { private: void on_blob_string_impl(std::string_view s) override { @@ -456,9 +368,9 @@ public: template < class T, - class Allocator = std::allocator, - class Push = push> -class response_array : public response_base { + class Allocator = std::allocator + > +class response_array : public response_base { private: void add(std::string_view s = {}) { @@ -478,14 +390,10 @@ private: void select_set_impl(int n) override { } void select_map_impl(int n) override { } void select_push_impl(int n) override { } + void on_streamed_string_part_impl(std::string_view s = {}) override { add(s); } public: std::vector result; - - void clear() { result.clear(); } - auto size() const noexcept { return std::size(result); } - - void on_streamed_string_part(std::string_view s = {}) override { add(s); } }; template > @@ -494,11 +402,8 @@ using response_flat_map = response_array; template > using response_flat_set = response_array; -template < - class T, - std::size_t N, - class Push = push> -class response_static_array : public response_base { +template +class response_static_array : public response_base { private: int i = 0; void on_blob_string_impl(std::string_view s) override @@ -510,9 +415,9 @@ public: template < class T, - std::size_t N, - class Push = push> -class response_static_flat_map : public response_base { + std::size_t N + > +class response_static_flat_map : public response_base { private: int i = 0; @@ -529,5 +434,17 @@ public: std::array result; }; +struct responses { + response_flat_map push; + response_simple_string simple_string; + response_blob_string blob_string; + response_flat_map map; + response_array array; + response_set set; + response_number number; + response_depth1 depth1; + std::queue trans; +}; + } // resp } // aedis diff --git a/tests/general.cpp b/tests/general.cpp index 53a49922..dd7f5974 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -321,11 +321,9 @@ net::awaitable simple_error() { std::string cmd {"-Error\r\n"}; test_tcp_socket ts {cmd}; - resp::response_base res; + resp::response_simple_string res; co_await resp::async_read(ts, buffer, res); - check_equal(res.error_message(), {"Error"}, "simple_error (message)"); - check_equal(res.is_error(), true, "is_error"); - check_equal(res.get_error(), resp::error::simple_error, "simple_error"); + check_equal(res.result, {"Error"}, "simple_error (message)"); } } @@ -384,19 +382,17 @@ net::awaitable blob_error() { std::string cmd {"!21\r\nSYNTAX invalid syntax\r\n"}; test_tcp_socket ts {cmd}; - resp::response_base res; + resp::response_blob_string res; co_await resp::async_read(ts, buffer, res); - check_equal(res.error_message(), {"SYNTAX invalid syntax"}, "blob_error (message)"); - check_equal(res.get_error(), resp::error::blob_error, "blob_error (enum)"); + check_equal(res.result, {"SYNTAX invalid syntax"}, "blob_error (message)"); } { std::string cmd {"!0\r\n\r\n"}; test_tcp_socket ts {cmd}; - resp::response_base res; + resp::response_blob_string res; co_await resp::async_read(ts, buffer, res); - check_equal(res.error_message(), {}, "blob_error (empty message)"); - check_equal(res.get_error(), resp::error::blob_error, "blob_error (enum)"); + check_equal(res.result, {}, "blob_error (empty message)"); } }