diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 83ebfdd6..475faa66 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -1,62 +1,61 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +#include #include -#include +#include + +#include namespace net = aedis::net; - -namespace net = aedis::net; -using tcp = net::ip::tcp; -using tcp_socket = net::use_awaitable_t<>::as_default_on_t; - -namespace this_coro = net::this_coro; - -using namespace net; using namespace aedis; +using tcp = net::ip::tcp; -void fill(resp::request& req) +void fill1(resp::request& req) { - req.multi(); - req.hello(); + req.ping(); + //req.multi(); req.rpush("list", {1, 2, 3}); req.lrange("list"); - req.exec(); - req.quit(); + //req.exec(); + req.ping(); } net::awaitable example() { try { - auto ex = co_await this_coro::executor; - + auto ex = co_await net::this_coro::executor; tcp::resolver resv(ex); - tcp_socket socket {ex}; - co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379")); + auto const r = resv.resolve("127.0.0.1", "6379"); + tcp::socket socket {ex}; + co_await async_connect(socket, r, net::use_awaitable); - resp::request req; - fill(req); - co_await net::async_write(socket, net::buffer(req.payload)); + auto reqs = resp::make_request_queue(); + resp::response_buffers resps; + resp::receiver_print recv{resps}; + net::steady_timer st{ex}; + + co_spawn(ex, resp::async_reader(socket, reqs, resps, recv), net::detached); + resp::async_writer(socket, reqs, st, net::detached); + queue_writer(reqs, fill1, st); + + net::steady_timer timer(ex, std::chrono::years{1}); + co_await timer.async_wait(net::use_awaitable); - std::string buffer; - for (;;) { - resp::response_array hello; - co_await resp::async_read(socket, buffer, hello); - resp::print(hello.result); - } } catch (std::exception const& e) { - std::cerr << e.what() << std::endl; + std::cout << e.what() << std::endl; } } int main() { - io_context ioc {1}; - co_spawn(ioc, example(), detached); + net::io_context ioc {1}; + co_spawn(ioc, example(), net::detached); ioc.run(); } + diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp index 9e0d66f0..8ffc8125 100644 --- a/examples/async_pubsub.cpp +++ b/examples/async_pubsub.cpp @@ -7,6 +7,7 @@ #include #include +#include #include @@ -37,7 +38,7 @@ operator<<(std::ostream& os, myevent e) return os; } -auto fill_req(resp::request& req) +auto filler(resp::request& req) { //req.subscribe("channel"); //req.subscribe("__keyspace@0__:user:*"); @@ -68,10 +69,19 @@ auto fill_req(resp::request& req) //req.del("eee"); } +void fill1(resp::request& req) +{ + req.multi(); + req.rpush("list", {1, 2, 3}); + req.lrange("list"); + req.exec(); + req.ping(); +} + net::awaitable subscriber() { - auto ex = co_await net::this_coro::executor; try { + auto ex = co_await net::this_coro::executor; tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp::socket socket {ex}; @@ -84,15 +94,13 @@ net::awaitable subscriber() co_spawn( ex, - resp::async_reader(socket, reqs, recv, resps), + resp::async_reader(socket, reqs, resps, recv), net::detached); resp::async_writer(socket, reqs, st, net::detached); - auto filler = [](auto& req){fill_req(req);}; - for (;;) { - queue_writer(reqs, filler, st); + queue_writer(reqs, fill1, st); net::steady_timer timer(ex, std::chrono::milliseconds{1000}); co_await timer.async_wait(net::use_awaitable); } diff --git a/include/aedis/read.hpp b/include/aedis/read.hpp index 3635d38b..8a189408 100644 --- a/include/aedis/read.hpp +++ b/include/aedis/read.hpp @@ -253,17 +253,24 @@ auto async_read_type( stream); } +struct receiver_ignore { + template + void receive(response_id const&) {} + template + void receive_transaction(std::queue>) {} +}; + template < class AsyncReadWriteStream, - class Receiver, class Event, - class ResponseBuffer> + class ResponseBuffer, + class Receiver = receiver_ignore> net::awaitable async_reader( AsyncReadWriteStream& socket, std::queue>& reqs, - Receiver& recv, - ResponseBuffer& resps) + ResponseBuffer& resps, + Receiver recv = receiver_ignore{}) { using response_id_type = response_id; @@ -284,15 +291,14 @@ async_reader( // The next two ifs are used to deal with transactions. if (is_multi || (!trans_empty && !is_exec)) { - // The multi commands always gets a "OK" response and all other - // commands get QUEUED unless the user is e.g. using wrong data types. - auto const* res = cmd == command::multi ? "OK" : "QUEUED"; - - response_static_string tmp; + response_static_string<6> tmp; co_await async_read(socket, buffer, tmp, net::use_awaitable); - // Failing to QUEUE a command inside a trasaction is considered an - // application error. + // Failing to QUEUE a command inside a trasaction is + // considered an application error. The multi commands + // always gets a "OK" response and all other commands get + // QUEUED unless the user is e.g. using wrong data types. + auto const* res = cmd == command::multi ? "OK" : "QUEUED"; assert (tmp.result == res); // Pushes the command in the transction command queue that will be @@ -344,6 +350,7 @@ async_reader( buffer, *tmp, net::use_awaitable); + recv.receive(id); if (t != type::push) @@ -361,30 +368,7 @@ async_reader( } } -struct receiver_ignore { - template - void receive_transaction(std::queue>) { } - template - void receive(response_id const&) { } -}; - -struct receiver_print { - // The ids in the queue parameter have an unspecified message type. - template - void receive_transaction(std::queue> ids) - { - while (!std::empty(ids)) { - std::cout << ids.front() << std::endl; - ids.pop(); - } - } - - template - void receive(response_id const& id) - { std::cout << id << std::endl; } -}; - -template +template std::queue> make_request_queue() { diff --git a/include/aedis/receiver_print.hpp b/include/aedis/receiver_print.hpp new file mode 100644 index 00000000..d2b9dc36 --- /dev/null +++ b/include/aedis/receiver_print.hpp @@ -0,0 +1,94 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +#include "type.hpp" +#include "utils.hpp" +#include "response.hpp" +#include "request.hpp" + +namespace aedis { namespace resp { + +class receiver_print { +private: + response_buffers& buffer_; + +public: + receiver_print(response_buffers& buffer) + : buffer_{buffer} + {} + + // The ids in the queue parameter have an unspecified message type. + template + void receive_transaction(std::queue> ids) + { + while (!std::empty(ids)) { + std::cout << ids.front() << std::endl; + ids.pop(); + } + } + + template + void receive(response_id const& id) + { + buffer_.tree().clear(); + + std::cout << id; + switch (id.t) { + case type::push: + buffer_.push().clear(); + break; + case type::set: + buffer_.set().clear(); + break; + case type::map: + buffer_.map().clear(); + break; + case type::attribute: + buffer_.attribute().clear(); + break; + case type::array: + buffer_.array().clear(); + break; + case type::simple_error: + buffer_.simple_error().clear(); + break; + case type::simple_string: + buffer_.simple_string().clear(); + break; + case type::number: + break; + case type::double_type: + break; + case type::big_number: + buffer_.big_number().clear(); + break; + case type::boolean: + break; + case type::blob_error: + buffer_.blob_error().clear(); + break; + case type::blob_string: + buffer_.blob_string().clear(); + break; + case type::verbatim_string: + buffer_.verbatim_string().clear(); + break; + case type::streamed_string_part: + buffer_.streamed_string_part().clear(); + break; + default:{} + } + std::cout << std::endl; + } +}; + +} +} diff --git a/include/aedis/response.hpp b/include/aedis/response.hpp index a729b353..01a55f1d 100644 --- a/include/aedis/response.hpp +++ b/include/aedis/response.hpp @@ -39,11 +39,6 @@ inline void from_string_view(std::string_view s, std::string& r) { r = s; } -/* A base class for flat responses which means response with no - * embedded types in themselves. For exaple, a transaction with an - * lrange in it will produce a response that is an array with an - * array. That is not suitable for this class. - */ class response_base { protected: virtual void on_simple_string_impl(std::string_view s) @@ -100,7 +95,8 @@ public: virtual ~response_base() {} }; -struct response_ignore : response_base { +class response_ignore : public response_base { +private: void on_simple_string_impl(std::string_view s) override {} void on_simple_error_impl(std::string_view s) override {} void on_number_impl(std::string_view s) override {} @@ -119,8 +115,8 @@ struct response_ignore : response_base { void select_attribute_impl(int n) override {} }; -// This response type is able to deal with recursive redis responses as in a -// transaction for example. +// This response type is able to deal with recursive redis responses +// as in a transaction for example. class response_tree: public response_base { public: struct elem { @@ -183,9 +179,9 @@ public: }; template -class response_number : public response_base { - static_assert(std::is_integral::value); +class response_basic_number : public response_base { private: + static_assert(std::is_integral::value); void on_number_impl(std::string_view s) override { from_string_view(s, result); } @@ -193,53 +189,74 @@ public: T result; }; +using response_number = response_basic_number; + template< class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator> -class response_blob_string : public response_base { +class response_basic_blob_string : public response_base { private: - void add(std::string_view s) - { from_string_view(s, result); } - void on_blob_string_impl(std::string_view s) override - { add(s); } - void on_blob_error_impl(std::string_view s) override - { add(s); } - void on_simple_string_impl(std::string_view s) override - { add(s); } - void on_simple_error_impl(std::string_view s) override - { add(s); } + { from_string_view(s, result); } public: std::basic_string result; }; +using response_blob_string = response_basic_blob_string; + +template< + class CharT = char, + class Traits = std::char_traits, + class Allocator = std::allocator> +class response_basic_blob_error : public response_base { +private: + void on_blob_error_impl(std::string_view s) override + { from_string_view(s, result); } +public: + std::basic_string result; +}; + +using response_blob_error = response_basic_blob_error; + template< class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator > -class response_simple_string : public response_base { +class response_basic_simple_string : public response_base { private: - void add(std::string_view s) - { from_string_view(s, result); } - void on_simple_string_impl(std::string_view s) override - { add(s); } + { from_string_view(s, result); } +public: + std::basic_string result; +}; + +using response_simple_string = response_basic_simple_string; + +template< + class CharT = char, + class Traits = std::char_traits, + class Allocator = std::allocator + > +class response_basic_simple_error : public response_base { +private: void on_simple_error_impl(std::string_view s) override - { add(s); } + { from_string_view(s, result); } public: std::basic_string result; }; +using response_simple_error = response_basic_simple_error; + // Big number use strings at the moment as the underlying storage. template < class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator > -class response_big_number : public response_base { +class response_basic_big_number : public response_base { private: void on_big_number_impl(std::string_view s) override { from_string_view(s, result); } @@ -248,13 +265,15 @@ public: std::basic_string result; }; +using response_big_number = response_basic_big_number; + // TODO: Use a double instead of string. template < class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator > -class response_double : public response_base { +class response_basic_double : public response_base { private: void on_double_impl(std::string_view s) override { from_string_view(s, result); } @@ -263,6 +282,8 @@ public: std::basic_string result; }; +using response_double = response_basic_double; + template < class T, class Allocator = std::allocator> @@ -286,7 +307,7 @@ template< class Traits = std::char_traits, class Allocator = std::allocator > -class response_verbatim_string : public response_base { +class response_basic_verbatim_string : public response_base { private: void on_verbatim_string_impl(std::string_view s) override { from_string_view(s, result); } @@ -294,12 +315,14 @@ public: std::basic_string result; }; +using response_verbatim_string = response_basic_verbatim_string; + template< class CharT = char, class Traits = std::char_traits, class Allocator = std::allocator > -class response_streamed_string : public response_base { +class response_basic_streamed_string : public response_base { private: void on_streamed_string_part_impl(std::string_view s) override { result += s; } @@ -307,6 +330,8 @@ public: std::basic_string result; }; +using response_streamed_string = response_basic_streamed_string; + template < class Key, class Compare = std::less, @@ -379,6 +404,7 @@ private: result.emplace_back(std::move(r)); } + // TODO: Call vector reserver. void on_simple_string_impl(std::string_view s) override { add(s); } void on_number_impl(std::string_view s) override { add(s); } void on_double_impl(std::string_view s) override { add(s); } @@ -405,16 +431,16 @@ using response_flat_set = response_array; template class response_static_array : public response_base { private: - int i = 0; + int i_ = 0; void on_blob_string_impl(std::string_view s) override - { from_string_view(s, result[i++]); } + { from_string_view(s, result[i_++]); } void select_array_impl(int n) override { } public: std::array result; }; -template +template class response_static_string : public response_base { private: void add(std::string_view s) @@ -434,10 +460,10 @@ template < > class response_static_flat_map : public response_base { private: - int i = 0; + int i_ = 0; void add(std::string_view s = {}) - { from_string_view(s, result.at(i++)); } + { from_string_view(s, result.at(i_++)); } void on_blob_string_impl(std::string_view s) override { add(s); } void on_number_impl(std::string_view s) override @@ -474,18 +500,21 @@ private: response_array set_; response_array map_; response_array attribute_; - response_simple_string simple_string_; - response_simple_string simple_error_; - response_number number_; - response_double double_; - response_big_number big_number_; - response_blob_string blob_string_; - response_blob_string blob_error_; - response_verbatim_string verbatim_string_; - response_streamed_string streamed_string_part_; + response_simple_string simple_string_; + response_simple_error simple_error_; + response_number number_; + response_double double_; + response_bool boolean_; + response_big_number big_number_; + response_blob_string blob_string_; + response_blob_error blob_error_; + response_verbatim_string verbatim_string_; + response_streamed_string streamed_string_part_; response_ignore ignore_; public: + auto& tree() {return tree_.result;}; + auto& array() {return array_.result;}; auto const& array() const noexcept {return array_.result;}; @@ -510,6 +539,9 @@ public: auto& number() {return number_.result;}; auto const& number() const noexcept {return number_.result;}; + auto& boolean() {return boolean_.result;}; + auto const& boolean() const noexcept {return boolean_.result;}; + auto& double_type() {return double_.result;}; auto const& double_type() const noexcept {return double_.result;}; @@ -528,10 +560,6 @@ public: auto& streamed_string_part() {return streamed_string_part_.result;}; auto const& streamed_string_part() const noexcept {return streamed_string_part_.result;}; - // TODO: The types bellow are still missing. - //null - //boolean - // When the id is from a transaction the type of the message is not // specified. template @@ -540,7 +568,7 @@ public: if (id.cmd == command::exec) return &tree_; - switch (id.type) { + switch (id.t) { case type::push: return &push_; case type::set: return &set_; case type::map: return &map_; @@ -551,6 +579,7 @@ public: case type::number: return &number_; case type::double_type: return &double_; case type::big_number: return &big_number_; + case type::boolean: return &boolean_; case type::blob_error: return &blob_error_; case type::blob_string: return &blob_string_; case type::verbatim_string: return &verbatim_string_; @@ -570,7 +599,7 @@ operator<<(std::ostream& os, aedis::resp::response_id const& id) os << std::left << std::setw(15) << aedis::resp::to_string(id.cmd) << std::left << std::setw(20) << aedis::resp::to_string(id.t) - << std::left << std::setw(20) << id.event + << std::left << std::setw(20) << (int)id.event ; return os; } diff --git a/tests/general.cpp b/tests/general.cpp index dd7f5974..bf022a10 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -64,9 +64,9 @@ net::awaitable test_list() } { // rpush - resp::response_number res; + resp::response_number res; co_await resp::async_read(socket, buffer, res); - check_equal(res.result, 6, "rpush"); + check_equal(res.result, (long long int)6, "rpush"); } { // lrange @@ -229,25 +229,25 @@ net::awaitable number() { // int std::string cmd {":-3\r\n"}; test_tcp_socket ts {cmd}; - resp::response_number res; + resp::response_number res; co_await resp::async_read(ts, buffer, res); - check_equal(res.result, -3, "number (int)"); + check_equal(res.result, (long long int)-3, "number (int)"); } { // unsigned std::string cmd {":3\r\n"}; test_tcp_socket ts {cmd}; - resp::response_number res; + resp::response_number res; co_await resp::async_read(ts, buffer, res); - check_equal(res.result, 3, "number (unsigned)"); + check_equal(res.result, (long long int)3, "number (unsigned)"); } { // std::size_t std::string cmd {":1111111\r\n"}; test_tcp_socket ts {cmd}; - resp::response_number res; + resp::response_number res; co_await resp::async_read(ts, buffer, res); - check_equal(res.result, 1111111, "number (std::size_t)"); + check_equal(res.result, (long long int)1111111, "number (std::size_t)"); } } @@ -321,7 +321,7 @@ net::awaitable simple_error() { std::string cmd {"-Error\r\n"}; test_tcp_socket ts {cmd}; - resp::response_simple_string res; + resp::response_simple_error res; co_await resp::async_read(ts, buffer, res); check_equal(res.result, {"Error"}, "simple_error (message)"); } @@ -382,7 +382,7 @@ net::awaitable blob_error() { std::string cmd {"!21\r\nSYNTAX invalid syntax\r\n"}; test_tcp_socket ts {cmd}; - resp::response_blob_string res; + resp::response_blob_error res; co_await resp::async_read(ts, buffer, res); check_equal(res.result, {"SYNTAX invalid syntax"}, "blob_error (message)"); } @@ -390,7 +390,7 @@ net::awaitable blob_error() { std::string cmd {"!0\r\n\r\n"}; test_tcp_socket ts {cmd}; - resp::response_blob_string res; + resp::response_blob_error res; co_await resp::async_read(ts, buffer, res); check_equal(res.result, {}, "blob_error (empty message)"); }