diff --git a/Makefile.am b/Makefile.am index 679f0b94..b74e8332 100644 --- a/Makefile.am +++ b/Makefile.am @@ -67,15 +67,20 @@ basic3_SOURCES = $(top_srcdir)/examples/basic3.cpp basic3_CPPFLAGS = $(MY_CPPFLAGS) basic3_LDADD = $(MY_LDADD) -noinst_PROGRAMS += stl_containers -stl_containers_SOURCES = $(top_srcdir)/examples/stl_containers.cpp -stl_containers_CPPFLAGS = $(MY_CPPFLAGS) -stl_containers_LDADD = $(MY_LDADD) +noinst_PROGRAMS += containers +containers_SOURCES = $(top_srcdir)/examples/containers.cpp +containers_CPPFLAGS = $(MY_CPPFLAGS) +containers_LDADD = $(MY_LDADD) -noinst_PROGRAMS += fast1 -fast1_SOURCES = $(top_srcdir)/examples/fast1.cpp -fast1_CPPFLAGS = $(MY_CPPFLAGS) -fast1_LDADD = $(MY_LDADD) +noinst_PROGRAMS += custom_response1 +custom_response1_SOURCES = $(top_srcdir)/examples/custom_response1.cpp +custom_response1_CPPFLAGS = $(MY_CPPFLAGS) +custom_response1_LDADD = $(MY_LDADD) + +noinst_PROGRAMS += custom_response2 +custom_response2_SOURCES = $(top_srcdir)/examples/custom_response2.cpp +custom_response2_CPPFLAGS = $(MY_CPPFLAGS) +custom_response2_LDADD = $(MY_LDADD) noinst_PROGRAMS += test test_SOURCES = $(top_srcdir)/tests/general.cpp diff --git a/examples/advanced.cpp b/examples/advanced.cpp index 67471631..c465fc17 100644 --- a/examples/advanced.cpp +++ b/examples/advanced.cpp @@ -55,54 +55,54 @@ writer(tcp_socket& socket, std::queue& reqs, std::string message net::awaitable reader(tcp_socket& socket, std::queue& reqs) { - // Auxiliary buffer. - std::string buffer; - // Writes and reads continuosly from the socket. - for (;;) { + for (std::string buffer;;) { // Writes the first request in queue and all subsequent - // ones that have no response. + // ones that have no response e.g. subscribe. do { co_await async_write(socket, reqs.front()); - // Some commands don't have a response or their responses - // are push types. In such cases we should pop them from - // queue. + // Pops the request if no response is expected. if (std::empty(reqs.front().commands)) reqs.pop(); } while (!std::empty(reqs) && std::empty(reqs.front().commands)); - // Keeps reading while there is no message to be sent. + // Keeps reading while there is no messages queued waiting to be sent. do { - // We have to consume the responses to all commands in the - // request. + // Loops to consume the response to all commands in the request. do { - // Reads the response to one command. - resp3::response resp; - co_await async_read(socket, buffer, resp); - if (resp.get_type() == resp3::type::push) { - // Server push. + // Reads the type of the incoming response. + auto const t = co_await resp3::async_read_type(socket, buffer); + + if (t == resp3::type::push) { + resp3::response resp; + co_await async_read(socket, buffer, resp); std::cout << resp << std::endl; } else { // Prints the command and the response to it. switch (reqs.front().commands.front()) { case command::hello: { + resp3::response resp; + co_await async_read(socket, buffer, resp); + for (auto i = 0; i < 100; ++i) { std::string msg = "Writer "; msg += std::to_string(i); co_spawn(socket.get_executor(), writer(socket, reqs, msg), net::detached); } } break; + default: + { + resp3::response resp; + co_await async_read(socket, buffer, resp); - default: {} + std::cout + << reqs.front().commands.front() << ":\n" + << resp << std::endl; + } } - - std::cout - << reqs.front().commands.front() << ":\n" - << resp << std::endl; - // Done with this command, pop. reqs.front().commands.pop(); } diff --git a/examples/basic1.cpp b/examples/basic1.cpp index 1c6aa645..420d3c9f 100644 --- a/examples/basic1.cpp +++ b/examples/basic1.cpp @@ -15,30 +15,44 @@ using namespace aedis; -/* A very simple example to illustrate the basic principles. It adds - * three commands to the request and reads the response one after the - * other. +/** A simple example that illustrates the basic principles. Three commands are + * sent in the same request * - * Notice the responses are read in the same object for - * simplification. + * 1. hello (always required) + * 2. ping + * 3. quit + * + * The responses are then read in sequence. For simplification we read all + * responses on the same object. */ net::awaitable ping() { - auto socket = co_await make_connection(); + try { + resp3::request req; + req.push(command::hello, 3); + req.push(command::ping); + req.push(command::quit); - resp3::request req; - req.push(command::hello, 3); - req.push(command::ping); - req.push(command::quit); - co_await async_write(socket, req); + auto socket = co_await make_connection(); + co_await async_write(socket, req); - std::string buffer; - resp3::response resp; - co_await async_read(socket, buffer, resp); - co_await async_read(socket, buffer, resp); - co_await async_read(socket, buffer, resp); + std::string buffer; + resp3::response resp; - std::cout << resp << std::endl; + // hello + co_await async_read(socket, buffer, resp); + + // ping + co_await async_read(socket, buffer, resp); + + // quit + co_await async_read(socket, buffer, resp); + + std::cout << resp << std::endl; + + } catch (std::exception const& e) { + std::cerr << e.what() << std::endl; + } } int main() diff --git a/examples/basic2.cpp b/examples/basic2.cpp index 8056b94a..7e632937 100644 --- a/examples/basic2.cpp +++ b/examples/basic2.cpp @@ -21,12 +21,12 @@ using namespace aedis; */ net::awaitable ping() { - auto socket = co_await make_connection(); - resp3::request req; req.push(command::hello, 3); req.push(command::ping); req.push(command::quit); + + auto socket = co_await make_connection(); co_await async_write(socket, req); std::string buffer; diff --git a/examples/basic3.cpp b/examples/basic3.cpp index 094efdfb..866eeb0c 100644 --- a/examples/basic3.cpp +++ b/examples/basic3.cpp @@ -14,12 +14,14 @@ using namespace aedis; -/* A more elaborate way sending requests where a new request is sent - * only after the last one has arrived. This can be used as a starting - * point for more complex applications. +/* A slightly more elaborate way dealing with requests and responses. * - * We also separate the application logic out out the coroutine for + * This time we send the ping + quit only after the hello command has + * arrived. We also separate the application logic out the coroutine for * clarity. + * + * This can be used as a starting point for more complex applications. + * */ // Adds a new element in the queue if necessary. @@ -41,9 +43,6 @@ void process_response( case command::hello: prepare_next(reqs); reqs.back().push(command::ping); - break; - case command::ping: - prepare_next(reqs); reqs.back().push(command::quit); break; default: {}; @@ -52,14 +51,13 @@ void process_response( net::awaitable ping() { - auto socket = co_await make_connection(); - - std::string buffer; - std::queue reqs; reqs.push({}); reqs.back().push(command::hello, 3); + auto socket = co_await make_connection(); + std::string buffer; + while (!std::empty(reqs)) { co_await async_write(socket, reqs.front()); while (!std::empty(reqs.front().commands)) { diff --git a/examples/fast1.cpp b/examples/fast1.cpp deleted file mode 100644 index ac9edb9f..00000000 --- a/examples/fast1.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include -#include - -#include - -#include "types.hpp" -#include "utils.ipp" - -using namespace aedis; - -/* A very simple example to illustrate the basic principles. It adds - * three commands to the request and reads the response one after the - * other. - * - * Notice the responses are read in the same object for - * simplification. - */ -net::awaitable fast1() -{ - auto socket = co_await make_connection(); - - resp3::request req; - req.push(command::hello, 3); - req.push(command::ping); - req.push(command::quit); - co_await async_write(socket, req); - - std::string buffer; - resp3::response resp; - co_await async_read(socket, buffer, resp); - co_await async_read(socket, buffer, resp); - co_await async_read(socket, buffer, resp); - - std::cout << resp << std::endl; -} - -int main() -{ - net::io_context ioc; - co_spawn(ioc, fast1(), net::detached); - ioc.run(); -} diff --git a/examples/pubsub.cpp b/examples/pubsub.cpp index 6c743dac..0e62a917 100644 --- a/examples/pubsub.cpp +++ b/examples/pubsub.cpp @@ -14,17 +14,17 @@ using namespace aedis; -/** A simple coroutine used to pusblish on a channel and exit. +/** Publisher: A coroutine that will pusblish on two channels and exit. */ net::awaitable publisher() { - auto socket = co_await make_connection(); - resp3::request req; req.push(command::hello, 3); req.push(command::publish, "channel1", "Message to channel1"); req.push(command::publish, "channel2", "Message to channel2"); req.push(command::quit); + + auto socket = co_await make_connection(); co_await async_write(socket, req); std::string buffer; @@ -34,13 +34,16 @@ net::awaitable publisher() co_await async_read(socket, buffer, ignore); } +/** Subscriber: Will subscribe to two channels and listen for messages + * indefinitely. + */ net::awaitable subscriber() { - auto socket = co_await make_connection(); - resp3::request req; req.push(command::hello, "3"); req.push(command::subscribe, "channel1", "channel2"); + + auto socket = co_await make_connection(); co_await async_write(socket, req); std::string buffer; diff --git a/examples/stl_containers.cpp b/examples/stl_containers.cpp deleted file mode 100644 index df8b711d..00000000 --- a/examples/stl_containers.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include -#include -#include -#include -#include - -#include - -#include "types.hpp" -#include "utils.ipp" - -using namespace aedis; - -net::awaitable stl_containers() -{ - // Helper function to get a connected socket. - auto socket = co_await make_connection(); - - std::vector vec - {1, 2, 3, 4, 5, 6}; - - std::set set - {"one", "two", "three"}; - - std::map map - { {"key1", "value1"} - , {"key2", "value2"} - , {"key3", "value3"} - }; - - resp3::request req; - req.push(command::hello, 3); - req.push(command::flushall); - req.push_range(command::rpush, "vector", std::cbegin(vec), std::cend(vec)); - req.push_range(command::sadd, "set", std::cbegin(set), std::cend(set)); - req.push_range(command::hset, "map", std::cbegin(map), std::cend(map)); - co_await async_write(socket, req); - - std::string buffer; - while (!std::empty(req.commands)) { - resp3::response resp; - co_await async_read(socket, buffer, resp); - std::cout << req.commands.front() << ":\n" << resp << std::endl; - req.commands.pop(); - } -} - -int main() -{ - net::io_context ioc; - co_spawn(ioc, stl_containers(), net::detached); - ioc.run(); -} diff --git a/include/aedis/resp3/detail/impl/parser.ipp b/include/aedis/resp3/detail/impl/parser.ipp index 6c150013..49449ff5 100644 --- a/include/aedis/resp3/detail/impl/parser.ipp +++ b/include/aedis/resp3/detail/impl/parser.ipp @@ -12,10 +12,34 @@ namespace aedis { namespace resp3 { namespace detail { -// Converts a decimal number in ascii format to an integer. -long long length(char const* p) + +type to_type(char c) { - long long len = 0; + switch (c) { + case '!': return type::blob_error; + case '=': return type::verbatim_string; + case '$': return type::blob_string; + case ';': return type::streamed_string_part; + case '-': return type::simple_error; + case ':': return type::number; + case ',': return type::doublean; + case '#': return type::boolean; + case '(': return type::big_number; + case '+': return type::simple_string; + case '_': return type::null; + case '>': return type::push; + case '~': return type::set; + case '*': return type::array; + case '|': return type::attribute; + case '%': return type::map; + default: return type::invalid; + } +} + +// Converts a decimal number in ascii format to an integer. +std::size_t length(char const* p) +{ + std::size_t len = 0; while (*p != '\r') { len = (10 * len) + (*p - '0'); p++; @@ -37,106 +61,84 @@ void parser::init(response_base* res) sizes_[4] = 1; sizes_[5] = 1; sizes_[6] = 1; - bulk_ = bulk_type::none; - bulk_length_ = std::numeric_limits::max(); -} - -void parser::on_null() -{ - res_->add(type::null, 1, depth_); - --sizes_[depth_]; -} - -void parser::on_bulk(parser::bulk_type b, std::string_view s) -{ - switch (b) { - case bulk_type::blob_error: res_->add(type::blob_error, 1, depth_, s); break; - case bulk_type::verbatim_string: res_->add(type::verbatim_string, 1, depth_, s); break; - case bulk_type::blob_string: res_->add(type::blob_string, 1, depth_, s); break; - case bulk_type::streamed_string_part: - { - if (std::empty(s)) { - sizes_[depth_] = 1; - } else { - res_->add(type::streamed_string_part, 1, depth_, s); - } - } break; - default: assert(false); - } - - --sizes_[depth_]; -} - -void parser::on_blob(char const* data, parser::bulk_type b) -{ - bulk_length_ = length(data + 1); - bulk_ = b; -} - -void parser::on_blob_string(char const* data) -{ - if (*(data + 1) == '?') { - sizes_[++depth_] = std::numeric_limits::max(); - return; - } - - on_blob(data, bulk_type::blob_string); -} - -void parser::on_data(type t, char const* data, std::size_t n) -{ - --sizes_[depth_]; - res_->add(t, 1, depth_, {data + 1, n - 3}); -} - -void parser::on_aggregate(type t, char const* data) -{ - auto const l = length(data + 1); - if (l == 0) { - --sizes_[depth_]; - res_->add(t, 0, depth_); - return; - } - - int counter; - switch (t) { - case type::map: - case type::attribute: - { - counter = 2; - } break; - default: counter = 1; - } - - res_->add(t, l, depth_, {}); - sizes_[++depth_] = counter * l; + bulk_ = type::invalid; + bulk_length_ = std::numeric_limits::max(); } std::size_t parser::advance(char const* data, std::size_t n) { - if (bulk_ != bulk_type::none) { + if (bulk_ != type::invalid) { n = bulk_length_ + 2; - on_bulk(bulk_, {data, (std::size_t)bulk_length_}); - bulk_ = bulk_type::none; + switch (bulk_) { + case type::streamed_string_part: + { + if (bulk_length_ == 0) { + sizes_[depth_] = 1; + } else { + res_->add(bulk_, 1, depth_, data, bulk_length_); + } + } break; + default: res_->add(bulk_, 1, depth_, data, bulk_length_); + } + + bulk_ = type::invalid; + --sizes_[depth_]; + } else if (sizes_[depth_] != 0) { - switch (*data) { - case '!': on_blob(data, bulk_type::blob_error); break; - case '=': on_blob(data, bulk_type::verbatim_string); break; - case '$': on_blob_string(data); break; - case ';': on_blob(data, bulk_type::streamed_string_part); break; - case '-': on_data(type::simple_error, data, n); break; - case ':': on_data(type::number, data, n); break; - case ',': on_data(type::doublean, data, n); break; - case '#': on_data(type::boolean, data, n); break; - case '(': on_data(type::big_number, data, n); break; - case '+': on_data(type::simple_string, data, n); break; - case '_': on_null(); break; - case '>': on_aggregate(type::push, data); break; - case '~': on_aggregate(type::set, data); break; - case '*': on_aggregate(type::array, data); break; - case '|': on_aggregate(type::attribute, data); break; - case '%': on_aggregate(type::map, data); break; - default: assert(false); + auto const t = to_type(*data); + switch (t) { + case type::blob_error: + case type::verbatim_string: + case type::streamed_string_part: + { + bulk_length_ = length(data + 1); + bulk_ = t; + } break; + case type::blob_string: + { + if (*(data + 1) == '?') { + sizes_[++depth_] = std::numeric_limits::max(); + } else { + bulk_length_ = length(data + 1); + bulk_ = type::blob_string; + } + } break; + case type::simple_error: + case type::number: + case type::doublean: + case type::boolean: + case type::big_number: + case type::simple_string: + { + res_->add(t, 1, depth_, data + 1, n - 3); + --sizes_[depth_]; + } break; + case type::null: + { + res_->add(type::null, 1, depth_); + --sizes_[depth_]; + } break; + case type::push: + case type::set: + case type::array: + case type::attribute: + case type::map: + { + auto const l = length(data + 1); + res_->add(t, l, depth_); + + if (l == 0) { + --sizes_[depth_]; + } else { + auto const m = element_multiplicity(t); + sizes_[++depth_] = m * l; + } + } break; + default: + { + // TODO: This should cause an error not an assert. + assert(false); + } } } diff --git a/include/aedis/resp3/detail/parser.hpp b/include/aedis/resp3/detail/parser.hpp index 1be82784..06dc1970 100644 --- a/include/aedis/resp3/detail/parser.hpp +++ b/include/aedis/resp3/detail/parser.hpp @@ -17,35 +17,30 @@ namespace detail { // resp3 parser. class parser { -public: - enum class bulk_type - { blob_error - , verbatim_string - , blob_string - , streamed_string_part - , none - }; - private: response_base* res_; - int depth_; - int sizes_[6]; // Streaming will require a bigger integer. - bulk_type bulk_; - int bulk_length_; + std::size_t depth_; + std::size_t sizes_[6]; + std::size_t bulk_length_; + type bulk_; void init(response_base* res); - void on_aggregate(type t, char const* data); - void on_null(); - void on_data(type t, char const* data, std::size_t n); - void on_bulk(bulk_type b, std::string_view s = {}); - void on_blob(char const* data, bulk_type b); - void on_blob_string(char const* data); public: parser(response_base* res); + + // Returns the number of bytes in data that have been consumed. std::size_t advance(char const* data, std::size_t n); - auto done() const noexcept { return depth_ == 0 && bulk_ == bulk_type::none; } + + // returns true when the parser is done with the current message. + auto done() const noexcept + { return depth_ == 0 && bulk_ == type::invalid; } + + // The bulk type expected in the next read. If none is expected returns + // type::invalid. auto bulk() const noexcept { return bulk_; } + + // The lenght of the next expected bulk_length. auto bulk_length() const noexcept { return bulk_length_; } }; @@ -74,7 +69,7 @@ public: { switch (start_) { for (;;) { - if (parser_.bulk() == detail::parser::bulk_type::none) { + if (parser_.bulk() == type::invalid) { case 1: start_ = 0; net::async_read_until( diff --git a/include/aedis/resp3/impl/type.ipp b/include/aedis/resp3/impl/type.ipp index c1c3750b..4ca13791 100644 --- a/include/aedis/resp3/impl/type.ipp +++ b/include/aedis/resp3/impl/type.ipp @@ -55,5 +55,14 @@ bool is_aggregate(type t) } } +std::size_t element_multiplicity(type t) +{ + switch (t) { + case type::map: + case type::attribute: return 2ULL; + default: return 1ULL; + } +} + } // resp3 } // aedis diff --git a/include/aedis/resp3/read.hpp b/include/aedis/resp3/read.hpp index c41da66d..127dfccc 100644 --- a/include/aedis/resp3/read.hpp +++ b/include/aedis/resp3/read.hpp @@ -31,7 +31,7 @@ auto read( detail::parser p {&res}; std::size_t n = 0; do { - if (p.bulk() == detail::parser::bulk_type::none) { + if (p.bulk() == type::invalid) { n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec); if (ec || n < 3) return n; diff --git a/include/aedis/resp3/response.hpp b/include/aedis/resp3/response.hpp index 31cdb678..5acd5980 100644 --- a/include/aedis/resp3/response.hpp +++ b/include/aedis/resp3/response.hpp @@ -99,8 +99,8 @@ public: */ size_type size() const noexcept; - void add(type t, int n, int depth, std::string_view s = {}) override - { data_.emplace_back(n, depth, t, std::string{s}); } + void add(type t, std::size_t n, std::size_t depth, char const* data = nullptr, std::size_t size = 0) override + { data_.emplace_back(n, depth, t, std::string{data, size}); } }; /// Equality comparison for a node. diff --git a/include/aedis/resp3/response_base.hpp b/include/aedis/resp3/response_base.hpp index 3558782c..623def70 100644 --- a/include/aedis/resp3/response_base.hpp +++ b/include/aedis/resp3/response_base.hpp @@ -19,7 +19,7 @@ namespace resp3 { * Users are allowed to override this class to customize responses. */ struct response_base { - virtual void add(type t, int n, int depth, std::string_view s = {}) {} + virtual void add(type t, std::size_t n, std::size_t depth, char const* data = nullptr, std::size_t size = 0) {} /** Virtual destructor to allow inheritance. */ diff --git a/include/aedis/resp3/type.hpp b/include/aedis/resp3/type.hpp index 1f508ab1..2e8dbe90 100644 --- a/include/aedis/resp3/type.hpp +++ b/include/aedis/resp3/type.hpp @@ -67,5 +67,11 @@ std::ostream& operator<<(std::ostream& os, type t); /// Returns true if the data type is an aggregate. bool is_aggregate(type t); +/** @brief Returns the element multilicity. + * + * For type map and attribute this value is 2, all other types have 1. +*/ +std::size_t element_multiplicity(type t); + } // resp3 } // aedis diff --git a/include/aedis/resp3/write.hpp b/include/aedis/resp3/write.hpp index 74478c09..edc68aa3 100644 --- a/include/aedis/resp3/write.hpp +++ b/include/aedis/resp3/write.hpp @@ -28,7 +28,7 @@ write( static_assert(boost::beast::is_sync_write_stream::value, "SyncWriteStream type requirements not met"); - return write(stream, net::buffer(req.payload), ec); + return net::write(stream, net::buffer(req.payload), ec); } template diff --git a/tests/basic_flat_array_adapter.hpp b/tests/basic_flat_array_adapter.hpp index 4775b58d..c1660986 100644 --- a/tests/basic_flat_array_adapter.hpp +++ b/tests/basic_flat_array_adapter.hpp @@ -26,13 +26,15 @@ struct basic_flat_array_adapter : response_base { basic_flat_array_adapter(basic_flat_array* p) : result(p) {} - void add(type t, int n, int depth, std::string_view s = {}) override + void add(type t, int n, int depth, char const* data = nullptr, std::size_t size = 0) override { if (is_aggregate(t)) { i = 0; result->resize(n); } else { - from_string_view(s, result->at(i)); + auto r = std::from_chars(data, data + size, result->at(i)); + if (r.ec == std::errc::invalid_argument) + throw std::runtime_error("from_chars: Unable to convert"); ++i; } }