diff --git a/examples/async_all_hashes.cpp b/examples/async_all_hashes.cpp index ebbee906..90f81ff5 100644 --- a/examples/async_all_hashes.cpp +++ b/examples/async_all_hashes.cpp @@ -17,31 +17,43 @@ namespace this_coro = net::this_coro; using namespace net; using namespace aedis; +struct foo { + std::string id {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string from {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string nick {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string avatar {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string description {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string location {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string product {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string details {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; + std::string values {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; +}; + +auto make_hset_arg(foo const& p) +{ + std::vector> v; + v.push_back({"id", p.id}); + v.push_back({"from", p.from}); + v.push_back({"nick", p.nick}); + v.push_back({"avatar", p.avatar}); + v.push_back({"description", p.description}); + v.push_back({"location", p.location}); + v.push_back({"product", p.product}); + v.push_back({"details", p.details}); + v.push_back({"values", p.values}); + return v; +} + +// tcp::resolver::results_type const& r net::awaitable create_hashes() { - std::map map1 - { {{"Name"}, {"Marcelo"}} - , {{"Education"}, {"Physics"}} - , {{"Job"}, {"Programmer"}} - }; - - std::map map2 - { {{"Name"}, {"Lae"}} - , {{"Education"}, {"Engineer"}} - , {{"Job"}, {"Engineer"}} - }; - - std::map map3 - { {{"Name"}, {"Louis"}} - , {{"Education"}, {"Nene"}} - , {{"Job"}, {"Nene"}} - }; - + std::vector posts(20000); resp::pipeline p; p.flushall(); - p.hset("user:map1", map1); - p.hset("user:map2", map2); - p.hset("user:map3", map3); + for (auto i = 0; i < std::ssize(posts); ++i) { + std::string const name = "posts:" + std::to_string(i); + p.hset(name, make_hset_arg(posts[i])); + } p.quit(); auto ex = co_await this_coro::executor; @@ -57,10 +69,10 @@ net::awaitable create_hashes() co_await resp::async_read(socket, buffer, res); } -net::awaitable read_hashes() +net::awaitable read_hashes_coro() { resp::pipeline p; - p.keys("user:*"); + p.keys("posts:*"); auto ex = co_await this_coro::executor; tcp::resolver resv(ex); @@ -73,7 +85,7 @@ net::awaitable read_hashes() resp::response_array keys; co_await resp::async_read(socket, buffer, keys); - print(keys.result); + //print(keys.result); // Generates the pipeline to retrieve all hashes. resp::pipeline pv; @@ -86,20 +98,55 @@ net::awaitable read_hashes() for (auto const& key : keys.result) { resp::response_array value; co_await resp::async_read(socket, buffer, value); - print(value.result); + //print(value.result); } resp::response quit; co_await resp::async_read(socket, buffer, quit); } +void read_hashes(net::io_context& ioc) +{ + resp::pipeline p; + p.keys("posts:*"); + + tcp::resolver resv(ioc); + auto const r = resv.resolve("127.0.0.1", "6379"); + tcp::socket socket {ioc}; + net::connect(socket, r); + net::write(socket, net::buffer(p.payload)); + + std::string buffer; + + resp::response_array keys; + resp::read(socket, buffer, keys); + + // Generates the pipeline to retrieve all hashes. + resp::pipeline pv; + for (auto const& o : keys.result) + pv.hvals(o); + pv.quit(); + + net::write(socket, net::buffer(pv.payload)); + + for (auto const& key : keys.result) { + resp::response_array value; + resp::read(socket, buffer, value); + } + + resp::response quit; + resp::read(socket, buffer, quit); +} + int main() { io_context ioc {1}; co_spawn(ioc, create_hashes(), detached); ioc.run(); ioc.restart(); - co_spawn(ioc, read_hashes(), detached); + co_spawn(ioc, read_hashes_coro(), detached); ioc.run(); + ioc.restart(); + read_hashes(ioc); } diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 4db6380b..d77daadd 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -32,7 +32,7 @@ net::awaitable example1() std::string buffer; for (;;) { - resp::response_string res; + resp::response_simple_string res; co_await resp::async_read(socket, buffer, res); std::cout << res.result << std::endl; } diff --git a/examples/async_reconnect.cpp b/examples/async_reconnect.cpp index 0125d380..69299d64 100644 --- a/examples/async_reconnect.cpp +++ b/examples/async_reconnect.cpp @@ -35,7 +35,7 @@ net::awaitable example1() std::string buffer; for (;;) { - resp::response_string res; + resp::response_simple_string res; co_await resp::async_read(socket, buffer, res); std::cout << res.result << std::endl; } diff --git a/examples/sync_basic.cpp b/examples/sync_basic.cpp index 65558047..eef22440 100644 --- a/examples/sync_basic.cpp +++ b/examples/sync_basic.cpp @@ -27,7 +27,7 @@ int main() std::string buffer; for (;;) { - resp::response_string res; + resp::response_simple_string res; resp::read(socket, buffer, res); std::cout << res.result << std::endl; } diff --git a/examples/sync_responses.cpp b/examples/sync_responses.cpp index ebf8da49..5733670c 100644 --- a/examples/sync_responses.cpp +++ b/examples/sync_responses.cpp @@ -35,7 +35,7 @@ int main() resp::read(socket, buffer, list); print(list.result); - resp::response_string ok; + resp::response_simple_string ok; resp::read(socket, buffer, ok); std::cout << ok.result << std::endl; diff --git a/include/aedis/pipeline.hpp b/include/aedis/pipeline.hpp index b6013bf2..b3ddcf71 100644 --- a/include/aedis/pipeline.hpp +++ b/include/aedis/pipeline.hpp @@ -289,15 +289,16 @@ public: resp::assemble(payload, "SET", {key}, std::cbegin(args), std::cend(args)); } - auto - hset(std::string const& key, - std::initializer_list l, - Event e = Event::ignore) - { - resp::assemble(payload, "HSET", {key}, std::cbegin(l), std::cend(l)); - events.push(e); - } + //auto + //hset(std::string const& key, + // std::initializer_list l, + // Event e = Event::ignore) + //{ + // resp::assemble(payload, "HSET", {key}, std::cbegin(l), std::cend(l)); + // events.push(e); + //} + // TODO: Find a way to assert the value type is a pair. template auto hset(std::string const& key, Range const& r, Event e = Event::ignore) diff --git a/include/aedis/resp.hpp b/include/aedis/resp.hpp index cd625102..5107d449 100644 --- a/include/aedis/resp.hpp +++ b/include/aedis/resp.hpp @@ -44,22 +44,22 @@ long long length(char const* p) return len; } +enum class bulk_type +{ blob_error +, verbatim_string +, blob_string +, streamed_string_part +, none +}; + template class parser { public: - enum class bulk - { blob_error - , verbatim_string - , blob_string - , streamed_string_part - , none - }; - private: Response* res_ = nullptr; int depth_ = 0; int sizes_[6] = {2, 1, 1, 1, 1, 1}; // Streaming will require a bigger integer. - bulk bulk_ = bulk::none; + bulk_type bulk_ = bulk_type::none; int bulk_length_ = std::numeric_limits::max(); auto on_array_impl(char const* data, int m = 1) @@ -91,7 +91,10 @@ private: { res_->select_attribute(on_array_impl(data, 2)); } void on_null() - { res_->on_null(); --sizes_[depth_]; } + { + res_->on_null(); + --sizes_[depth_]; + } auto handle_simple_string(char const* data, std::size_t n) { @@ -117,13 +120,13 @@ private: void on_big_number(char const* data, std::size_t n) { res_->on_big_number(handle_simple_string(data, n)); } - void on_bulk(bulk b, std::string_view s = {}) + void on_bulk(bulk_type b, std::string_view s = {}) { switch (b) { - case bulk::blob_error: res_->on_blob_error(s); break; - case bulk::verbatim_string: res_->on_verbatim_string(s); break; - case bulk::blob_string: res_->on_blob_string(s); break; - case bulk::streamed_string_part: + case bulk_type::blob_error: res_->on_blob_error(s); break; + case bulk_type::verbatim_string: res_->on_verbatim_string(s); break; + case bulk_type::blob_string: res_->on_blob_string(s); break; + case bulk_type::streamed_string_part: { if (std::empty(s)) { sizes_[depth_] = 1; @@ -137,35 +140,29 @@ private: --sizes_[depth_]; } - auto on_blob_error_impl(char const* data, bulk b) + auto on_blob_error_impl(char const* data, bulk_type b) { - auto const l = length(data + 1); - if (l == -1 || l == 0) { - on_bulk(b); - return bulk::none; - } - - bulk_length_ = l; + bulk_length_ = length(data + 1); return b; } auto on_streamed_string_size(char const* data) - { return on_blob_error_impl(data, bulk::streamed_string_part); } + { return on_blob_error_impl(data, bulk_type::streamed_string_part); } auto on_blob_error(char const* data) - { return on_blob_error_impl(data, bulk::blob_error); } + { return on_blob_error_impl(data, bulk_type::blob_error); } auto on_verbatim_string(char const* data) - { return on_blob_error_impl(data, bulk::verbatim_string); } + { return on_blob_error_impl(data, bulk_type::verbatim_string); } auto on_blob_string(char const* data) { if (*(data + 1) == '?') { sizes_[++depth_] = std::numeric_limits::max(); - return bulk::none; + return bulk_type::none; } - return on_blob_error_impl(data, bulk::blob_string); + return on_blob_error_impl(data, bulk_type::blob_string); } public: @@ -175,8 +172,8 @@ public: std::size_t advance(char const* data, std::size_t n) { - auto next = bulk::none; - if (bulk_ != bulk::none) { + auto next = bulk_type::none; + if (bulk_ != bulk_type::none) { n = bulk_length_ + 2; on_bulk(bulk_, {data, (std::size_t)bulk_length_}); } else { @@ -200,7 +197,8 @@ public: case '%': on_map(data); break; default: assert(false); } - } + } else { + } } while (sizes_[depth_] == 0) @@ -211,7 +209,7 @@ public: } auto done() const noexcept - { return depth_ == 0 && bulk_ == bulk::none; } + { return depth_ == 0 && bulk_ == bulk_type::none; } auto bulk() const noexcept { return bulk_; } @@ -263,7 +261,7 @@ public: { switch (start_) { for (;;) { - if (parser_.bulk() == parser::bulk::none) { + if (parser_.bulk() == bulk_type::none) { case 1: start_ = 0; net::async_read_until( @@ -286,10 +284,11 @@ public: start_ = 0; auto const s = std::ssize(*buf_); auto const l = parser_.bulk_length(); + auto const to_read = static_cast(l + 2 - s); buf_->resize(l + 2); net::async_read( stream_, - net::buffer(buf_->data() + s, l + 2 - s), + net::buffer(buf_->data() + s, to_read), net::transfer_all(), std::move(self)); return; @@ -297,13 +296,10 @@ public: default: { - // The condition below is wrong. it must be n < 3 for case 1 - // and n < 2 for the async_read. - if (ec || n < 3) + if (ec) return self.complete(ec); n = parser_.advance(buf_->data(), n); - buf_->erase(0, n); if (parser_.done()) return self.complete({}); @@ -326,7 +322,7 @@ auto read( parser p {&res}; std::size_t n = 0; do { - if (p.bulk() == parser::bulk::none) { + if (p.bulk() == bulk_type::none) { n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec); if (ec || n < 3) return n; @@ -335,14 +331,15 @@ auto read( auto const l = p.bulk_length(); if (s < (l + 2)) { buf.resize(l + 2); - n = net::read(stream, net::buffer(buf.data() + s, l + 2 - s)); - if (ec || n < 2) + auto const to_read = static_cast(l + 2 - s); + n = net::read(stream, net::buffer(buf.data() + s, to_read)); + assert(n >= to_read); + if (ec) return n; } } n = p.advance(buf.data(), n); - buf.erase(0, n); } while (!p.done()); diff --git a/include/aedis/response.hpp b/include/aedis/response.hpp index 8654441d..5c75d116 100644 --- a/include/aedis/response.hpp +++ b/include/aedis/response.hpp @@ -68,9 +68,27 @@ private: error err_ = error::none; bool is_null_ = false; std::string err_msg_; + bool is_attr_ = false; + +protected: + virtual void on_simple_string_impl(std::string_view s) + { throw std::runtime_error("on_simple_string_impl: Has not been overridden."); } + virtual void on_number_impl(std::string_view s) + { throw std::runtime_error("on_number_impl: Has not been overridden."); } + virtual void on_double_impl(std::string_view s) + { throw std::runtime_error("on_double_impl: Has not been overridden."); } + virtual void on_bool_impl(std::string_view s) + { throw std::runtime_error("on_bool_impl: Has not been overridden."); } + virtual void on_big_number_impl(std::string_view s) + { throw std::runtime_error("on_big_number_impl: Has not been overridden."); } + virtual void on_verbatim_string_impl(std::string_view s = {}) + { throw std::runtime_error("on_verbatim_string_impl: Has not been overridden."); } + virtual void on_blob_string_impl(std::string_view s = {}) + { throw std::runtime_error("on_blob_string_impl: Has not been overridden."); } public: - // Error is dealt with in the base class. + std::vector attribute; + auto get_error() const noexcept {return err_;} auto const& message() const noexcept {return err_msg_;} @@ -88,21 +106,86 @@ public: void on_null() {is_null_ = true; } + void select_attribute(int n) { is_attr_ = true; } + // Function derived classes can overwrite. virtual void select_push(int n) { throw std::runtime_error("select_push: Has not been overridden."); } - virtual void select_attribute(int n) { throw std::runtime_error("select_attribute: Has not been overridden."); } - virtual void select_array(int n) { throw std::runtime_error("select_array: Has not been overridden."); } virtual void select_set(int n) { throw std::runtime_error("select_set: Has not been overridden."); } virtual void select_map(int n) { throw std::runtime_error("select_map: Has not been overridden."); } - virtual void on_simple_string(std::string_view s) { throw std::runtime_error("on_simple_string: Has not been overridden."); } - virtual void on_number(std::string_view s) { throw std::runtime_error("on_number: Has not been overridden."); } - virtual void on_double(std::string_view s) { throw std::runtime_error("on_double: Has not been overridden."); } - virtual void on_bool(std::string_view s) { throw std::runtime_error("on_bool: Has not been overridden."); } - virtual void on_big_number(std::string_view s) { throw std::runtime_error("on_big_number: Has not been overridden."); } - virtual void on_verbatim_string(std::string_view s = {}) { throw std::runtime_error("on_verbatim_string: Has not been overridden."); } - virtual void on_blob_string(std::string_view s = {}) { throw std::runtime_error("on_blob_string: Has not been overridden."); } + void on_simple_string(std::string_view s) + { + if (is_attr_) { + //attribute.push_back(s); + return; + } + + on_simple_string_impl(s); + } + + void on_number(std::string_view s) + { + if (is_attr_) { + //attribute.push_back(s); + return; + } + + on_number_impl(s); + } + + void on_double(std::string_view s) + { + if (is_attr_) { + //attribute.push_back(s); + return; + } + + on_double_impl(s); + } + + void on_bool(std::string_view s) + { + if (is_attr_) { + //attribute.push_back(s); + return; + } + + on_bool_impl(s); + } + + void on_big_number(std::string_view s) + { + if (is_attr_) { + //attribute.push_back(s); + return; + } + + on_big_number_impl(s); + } + + void on_verbatim_string(std::string_view s = {}) + { + if (is_attr_) { + //attribute.push_back(s); + return; + } + + on_verbatim_string_impl(s); + } + + void on_blob_string(std::string_view s = {}) + { + if (is_attr_) { + //attribute.push_back(s); + return; + } + + on_blob_string_impl(s); + } + + // At the moment, I don't see any reason for why we should support + // attributes and pushes and push types. virtual void on_streamed_string_part(std::string_view s = {}) { throw std::runtime_error("on_streamed_string_part: Has not been overridden."); } virtual ~response_base() {} }; @@ -120,45 +203,40 @@ void from_string_view(std::string_view s, std::string& r) { r = s; } template -struct response_number : response_base { +class response_number : public response_base { static_assert(std::is_integral::value); +private: + void on_number_impl(std::string_view s) override + { from_string_view(s, result); } + +public: T result; - void on_number(std::string_view s) override - { from_string_view(s, result); } -}; - -template< - class CharT, - class Traits = std::char_traits, - class Allocator = std::allocator> -struct response_basic_string : response_base { - std::basic_string result; - - void on_simple_string(std::string_view s) override { from_string_view(s, result); } - void on_verbatim_string(std::string_view s) override { from_string_view(s, result); } - void on_blob_string(std::string_view s) override { from_string_view(s, result); } -}; - -using response_string = response_basic_string; - -template< - class CharT = char, - class Traits = std::char_traits, - class Allocator = std::allocator> -struct response_blob_string : response_base { - std::basic_string result; - void on_blob_string(std::string_view s) override - { from_string_view(s, result); } }; template< class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator> -struct response_simple_string : response_base { - std::basic_string result; - void on_simple_string(std::string_view s) override +class response_blob_string : public response_base { +private: + void on_blob_string_impl(std::string_view s) override { from_string_view(s, result); } + +public: + std::basic_string result; +}; + +template< + class CharT = char, + class Traits = std::char_traits, + class Allocator = std::allocator> +class response_simple_string : public response_base { +private: + void on_simple_string_impl(std::string_view s) override + { from_string_view(s, result); } + +public: + std::basic_string result; }; // Big number use strings at the moment as the underlying storage. @@ -166,10 +244,13 @@ template< class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator> -struct response_big_number : response_base { - std::basic_string result; - void on_big_number(std::string_view s) override +class response_big_number : public response_base { +private: + void on_big_number_impl(std::string_view s) override { from_string_view(s, result); } + +public: + std::basic_string result; }; // TODO: Use a double instead of string. @@ -177,34 +258,42 @@ template< class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator> -struct response_double : response_base { - std::basic_string result; - void on_double(std::string_view s) override +class response_double : public response_base { +private: + void on_double_impl(std::string_view s) override { from_string_view(s, result); } + +public: + std::basic_string result; }; template < class T, class Allocator = std::allocator> -struct response_list : response_base { - std::list result; - void select_array(int n) override { } - void on_blob_string(std::string_view s) override +class response_list : public response_base { +private: + void on_blob_string_impl(std::string_view s) override { T r; from_string_view(s, r); result.push_back(std::move(r)); } + +public: + void select_array(int n) override { } + std::list result; }; template< class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator> -struct response_verbatim_string : response_base { - std::basic_string result; - void on_verbatim_string(std::string_view s) override +class response_verbatim_string : public response_base { +private: + void on_verbatim_string_impl(std::string_view s) override { from_string_view(s, result); } +public: + std::basic_string result; }; template< @@ -230,16 +319,17 @@ private: result.insert(std::end(result), std::move(r)); } + void on_simple_string_impl(std::string_view s) override { add(s); } + void on_blob_string_impl(std::string_view s) override { add(s); } + public: std::set result; void select_set(int n) override { } - void on_blob_string(std::string_view s) override { add(s); } - void on_simple_string(std::string_view s) override { add(s); } }; -struct response_bool : response_base { - bool result; - void on_bool(std::string_view s) override +class response_bool : public response_base { +private: + void on_bool_impl(std::string_view s) override { if (std::ssize(s) != 1) { // We can't hadle an error in redis. @@ -248,26 +338,32 @@ struct response_bool : response_base { result = s[0] == 't'; } + +public: + bool result; }; template< class Key, class Compare = std::less, class Allocator = std::allocator> -struct response_unordered_set : response_base { - std::set result; - void select_array(int n) override { } - void select_set(int n) override { } - void on_blob_string(std::string_view s) override +class response_unordered_set : public response_base { +private: + void on_blob_string_impl(std::string_view s) override { Key r; from_string_view(s, r); result.insert(std::end(result), std::move(r)); } + +public: + std::set result; + void select_array(int n) override { } + void select_set(int n) override { } }; template > -struct response_array : response_base { +class response_array : public response_base { private: void add(std::string_view s = {}) { @@ -276,6 +372,14 @@ private: result.emplace_back(std::move(r)); } + void on_simple_string_impl(std::string_view s) override { add(s); } + void on_number_impl(std::string_view s) override { add(s); } + void on_double_impl(std::string_view s) override { add(s); } + void on_bool_impl(std::string_view s) override { add(s); } + void on_big_number_impl(std::string_view s) override { add(s); } + void on_verbatim_string_impl(std::string_view s = {}) override { add(s); } + void on_blob_string_impl(std::string_view s = {}) override { add(s); } + public: std::vector result; @@ -286,15 +390,7 @@ public: void select_push(int n) override { } void select_set(int n) override { } void select_map(int n) override { } - void select_attribute(int n) override { } - void on_simple_string(std::string_view s) override { add(s); } - void on_number(std::string_view s) override { add(s); } - void on_double(std::string_view s) override { add(s); } - void on_bool(std::string_view s) override { add(s); } - void on_big_number(std::string_view s) override { add(s); } - void on_verbatim_string(std::string_view s = {}) override { add(s); } - void on_blob_string(std::string_view s = {}) override { add(s); } void on_streamed_string_part(std::string_view s = {}) override { add(s); } }; diff --git a/tests/general.cpp b/tests/general.cpp index ea73c827..198418d2 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -55,7 +55,7 @@ net::awaitable test_list() } { // flushall - resp::response_string res; + resp::response_simple_string res; co_await resp::async_read(socket, buffer, res); check_equal(res.result, {"OK"}, "flushall"); } @@ -79,19 +79,19 @@ net::awaitable test_list() } { // ltrim - resp::response_string res; + resp::response_simple_string res; co_await resp::async_read(socket, buffer, res); check_equal(res.result, {"OK"}, "ltrim"); } { // lpop. Why a blob string instead of a number? - resp::response_string res; + resp::response_blob_string res; co_await resp::async_read(socket, buffer, res); check_equal(res.result, {"3"}, "lpop"); } { // quit - resp::response_string res; + resp::response_simple_string res; co_await resp::async_read(socket, buffer, res); check_equal(res.result, {"OK"}, "quit"); } @@ -204,7 +204,7 @@ net::awaitable offline() , "*0\r\n" , "$2\r\nhh\r\n" , "$26\r\nhhaa\aaaa\raaaaa\r\naaaaaaaaaa\r\n" - , "$0\r\n" + , "$0\r\n\r\n" , "-Error\r\n" , ",1.23\r\n" , ",inf\r\n"