From 589181d4eba279070c031eeadf31ef5e5084b028 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 9 Jan 2022 16:26:38 +0100 Subject: [PATCH] Ports read functions to use the dynamic_buffer. --- examples/hashes.cpp | 23 ++++---- examples/intro.cpp | 19 +++---- examples/key_expiration.cpp | 23 ++++---- examples/lists.cpp | 23 ++++---- examples/nested_response.cpp | 21 +++---- examples/response_adapter.cpp | 19 ++++--- examples/response_queue.cpp | 18 +++--- examples/serialization.cpp | 19 ++++--- examples/sets.cpp | 24 ++++---- examples/subscriber.cpp | 15 ++--- examples/sync.cpp | 12 ++-- include/aedis/resp3/client_base.hpp | 13 +++-- include/aedis/resp3/detail/read_ops.hpp | 62 ++++++++------------- include/aedis/resp3/read.hpp | 37 +++++++------ tests/general.cpp | 74 ++++++++++++------------- tools/commands.cpp | 15 ++--- 16 files changed, 204 insertions(+), 213 deletions(-) diff --git a/examples/hashes.cpp b/examples/hashes.cpp index a4afc1a0..6461fbcb 100644 --- a/examples/hashes.cpp +++ b/examples/hashes.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -14,14 +14,15 @@ #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; +using resp3::serializer; +using resp3::adapt; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /* Shows how to serialize and read redis hashes in C++ containers. */ @@ -56,13 +57,13 @@ net::awaitable containers() // Reads the responses. std::string buffer; - co_await async_read(socket, buffer); // hello - co_await async_read(socket, buffer); // flushall - co_await async_read(socket, buffer, adapt(hset)); - co_await async_read(socket, buffer, adapt(hgetall1)); - co_await async_read(socket, buffer, adapt(hgetall2)); - co_await async_read(socket, buffer, adapt(hgetall3)); - co_await async_read(socket, buffer); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // hello + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // flushall + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(hset)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(hgetall1)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(hgetall2)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(hgetall3)); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // Prints the responses. std::cout << "hset: " << hset; diff --git a/examples/intro.cpp b/examples/intro.cpp index 86434cfa..f4a8dcfc 100644 --- a/examples/intro.cpp +++ b/examples/intro.cpp @@ -10,14 +10,15 @@ #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; +using resp3::serializer; +using resp3::adapt; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /* Illustrates the basic principles. @@ -49,16 +50,14 @@ net::awaitable ping() // Reads the responses. std::string buffer; - std::size_t n = 0; - n += co_await async_read(socket, buffer); // hello (ignored) - n += co_await async_read(socket, buffer); // flushall - n += co_await async_read(socket, buffer, adapt(ping)); - n += co_await async_read(socket, buffer, adapt(incr)); - n += co_await async_read(socket, buffer); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // hello (ignored) + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // flushall + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(ping)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(incr)); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // Print the responses. std::cout - << "Bytes read: " << n << "\n" << "ping: " << ping << "\n" << "incr: " << incr << "\n"; diff --git a/examples/key_expiration.cpp b/examples/key_expiration.cpp index e899d36a..ba18cec1 100644 --- a/examples/key_expiration.cpp +++ b/examples/key_expiration.cpp @@ -12,15 +12,16 @@ #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; -using aedis::resp3::node; +using resp3::serializer; +using resp3::adapt; +using resp3::node; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /* Shows how to deal with keys that may not exist. @@ -46,11 +47,11 @@ net::awaitable key_expiration() std::optional get; // Reads the responses. - std::string aux_buffer; - co_await async_read(socket, aux_buffer); // hello - co_await async_read(socket, aux_buffer); // flushall - co_await async_read(socket, aux_buffer); - co_await async_read(socket, aux_buffer, adapt(get)); + std::string rbuffer; + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // hello + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // flushall + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(get)); std::cout << "Before expiration: " << get.has_value() << ", " @@ -67,8 +68,8 @@ net::awaitable key_expiration() co_await async_write(socket, buffer(sr.request())); // Reads the response to the second request. - co_await async_read(socket, aux_buffer, adapt(get)); - co_await async_read(socket, aux_buffer); + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(get)); + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); std::cout << "After expiration: " << get.has_value() << "\n"; diff --git a/examples/lists.cpp b/examples/lists.cpp index 67a225ad..ac5b6411 100644 --- a/examples/lists.cpp +++ b/examples/lists.cpp @@ -15,14 +15,15 @@ #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; +using resp3::serializer; +using resp3::adapt; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /* Shows how to work with redis lists. @@ -60,14 +61,14 @@ net::awaitable ping() // Reads the responses. std::string rbuffer; - co_await async_read(socket, rbuffer); // hello - co_await async_read(socket, rbuffer); // flushall - co_await async_read(socket, rbuffer, adapt(rpush)); // rpush - co_await async_read(socket, rbuffer, adapt(svec)); - co_await async_read(socket, rbuffer, adapt(slist)); - co_await async_read(socket, rbuffer, adapt(sdeq)); - co_await async_read(socket, rbuffer, adapt(ivec)); - co_await async_read(socket, rbuffer); // quit + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // hello + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // flushall + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(rpush)); // rpush + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(svec)); + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(slist)); + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(sdeq)); + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(ivec)); + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // quit // Prints the responses. std::cout << "rpush: " << rpush; diff --git a/examples/nested_response.cpp b/examples/nested_response.cpp index 0c72d0e7..a857dfb5 100644 --- a/examples/nested_response.cpp +++ b/examples/nested_response.cpp @@ -10,15 +10,16 @@ #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; -using aedis::resp3::node; +using resp3::serializer; +using resp3::adapt; +using resp3::node; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /// Shows how to read nested responses. @@ -29,7 +30,6 @@ net::awaitable nested_response() serializer sr; sr.push(command::hello, 3); - sr.push(command::ping); sr.push(command::quit); co_await async_write(socket, buffer(sr.request())); @@ -39,14 +39,11 @@ net::awaitable nested_response() // Reads the response. std::string buffer; - co_await async_read(socket, buffer, adapt(hello)); - co_await async_read(socket, buffer, adapt(ping)); - co_await async_read(socket, buffer); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(hello)); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); - // Prints the responses. - std::cout << "hello: "; - for (auto const& e: hello) std::cout << e << " "; - std::cout << "\nPing: " << ping; + // Prints the response. + for (auto const& e: hello) std::cout << e << "\n"; } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/response_adapter.cpp b/examples/response_adapter.cpp index 33a5450d..0965477b 100644 --- a/examples/response_adapter.cpp +++ b/examples/response_adapter.cpp @@ -11,15 +11,16 @@ #include #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::type; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; +using resp3::type; +using resp3::serializer; +using resp3::adapt; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /* In the serialization.cpp example we saw how to serialize and deserialize Redis responses in user custom types. When serializing @@ -64,11 +65,11 @@ net::awaitable adapter_example() // Reads the responses. std::string rbuffer; - co_await async_read(socket, rbuffer); // hello - co_await async_read(socket, rbuffer); // flushall - co_await async_read(socket, rbuffer); // rpush - co_await async_read(socket, rbuffer, myadapter{}); // lrange - co_await async_read(socket, rbuffer); // quit + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // hello + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // flushall + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // rpush + co_await resp3::async_read(socket, dynamic_buffer(rbuffer), myadapter{}); // lrange + co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // quit } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/response_queue.cpp b/examples/response_queue.cpp index fd705f5f..ec677e23 100644 --- a/examples/response_queue.cpp +++ b/examples/response_queue.cpp @@ -1,5 +1,4 @@ -/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres at gmail dot com) -/// \example basic1.cpp +/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -10,15 +9,16 @@ #include #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::node; -using aedis::resp3::adapt; +using resp3::serializer; +using resp3::node; +using resp3::adapt; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /// Processes the responses in a loop using the helper queue. net::awaitable ping() @@ -40,13 +40,13 @@ net::awaitable ping() while (!std::empty(sr.commands)) { switch (sr.commands.front()) { case command::ping: - co_await async_read(socket, buffer, adapt(ping)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(ping)); break; case command::quit: - co_await async_read(socket, buffer, adapt(quit)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(quit)); break; default: - co_await async_read(socket, buffer); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); } sr.commands.pop(); diff --git a/examples/serialization.cpp b/examples/serialization.cpp index 1ca66d87..758dcf96 100644 --- a/examples/serialization.cpp +++ b/examples/serialization.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -13,14 +13,15 @@ #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; +using resp3::serializer; +using resp3::adapt; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /* Illustrates how to serialize your own data. @@ -92,11 +93,11 @@ net::awaitable serialization() // Reads the responses. std::string buffer; - co_await async_read(socket, buffer); // hello - co_await async_read(socket, buffer); // flushall - co_await async_read(socket, buffer); // set - co_await async_read(socket, buffer, adapt(get)); - co_await async_read(socket, buffer); // quit + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // hello + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // flushall + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // set + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(get)); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // quit // Print the responses. std::cout << "get: a = " << get.a << ", b = " << get.b << "\n"; diff --git a/examples/sets.cpp b/examples/sets.cpp index 9dd3d3d9..2e8af3d8 100644 --- a/examples/sets.cpp +++ b/examples/sets.cpp @@ -14,17 +14,17 @@ #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; +using resp3::serializer; +using resp3::adapt; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; -/* Shows how to serialize and read redis sets in C++ containers. - */ +/// Shows how to serialize and read redis sets in C++ containers. net::awaitable containers() { @@ -53,13 +53,13 @@ net::awaitable containers() // Reads the responses. std::string buffer; - co_await async_read(socket, buffer); // hello - co_await async_read(socket, buffer); // flushall - co_await async_read(socket, buffer, adapt(sadd)); - co_await async_read(socket, buffer, adapt(smembers1)); - co_await async_read(socket, buffer, adapt(smembers2)); - co_await async_read(socket, buffer, adapt(smembers3)); - co_await async_read(socket, buffer); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // hello + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // flushall + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(sadd)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(smembers1)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(smembers2)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(smembers3)); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // Prints the responses. std::cout << "sadd: " << sadd; diff --git a/examples/subscriber.cpp b/examples/subscriber.cpp index da9ded82..a13a0114 100644 --- a/examples/subscriber.cpp +++ b/examples/subscriber.cpp @@ -10,15 +10,16 @@ #include #include "utils.ipp" +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::async_read; -using aedis::resp3::adapt; -using aedis::resp3::node; +using resp3::serializer; +using resp3::adapt; +using resp3::node; namespace net = aedis::net; using net::async_write; using net::buffer; +using net::dynamic_buffer; /* In previous examples we sent some commands (ping) to redis and quit (closed) the connection. In this example we send a @@ -52,8 +53,8 @@ net::awaitable subscriber() // Reads the response to the hello command. std::string buffer; - co_await async_read(socket, buffer, adapt(resp)); - co_await async_read(socket, buffer); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(resp)); + co_await resp3::async_read(socket, dynamic_buffer(buffer)); // Saves the id of this connection. auto const id = resp.at(8).data; @@ -61,7 +62,7 @@ net::awaitable subscriber() // Loops to receive server pushes. for (;;) { resp.clear(); - co_await async_read(socket, buffer, adapt(resp)); + co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(resp)); std::cout << "Subscriber " << id << ":\n" diff --git a/examples/sync.cpp b/examples/sync.cpp index 936dd264..869f18e3 100644 --- a/examples/sync.cpp +++ b/examples/sync.cpp @@ -8,16 +8,16 @@ #include #include +namespace resp3 = aedis::resp3; using aedis::command; using aedis::resp3::serializer; -using aedis::resp3::read; using aedis::resp3::adapt; using aedis::resp3::node; namespace net = aedis::net; using net::ip::tcp; -using net::write; using net::buffer; +using net::dynamic_buffer; /* Aedis supports synchronous communication too. */ @@ -35,14 +35,14 @@ int main() sr.push(command::hello, 3); sr.push(command::command); sr.push(command::quit); - write(socket, buffer(sr.request())); + net::write(socket, buffer(sr.request())); std::vector resp; std::string buffer; - read(socket, buffer); - read(socket, buffer, adapt(resp)); - read(socket, buffer); + resp3::read(socket, dynamic_buffer(buffer)); + resp3::read(socket, dynamic_buffer(buffer), adapt(resp)); + resp3::read(socket, dynamic_buffer(buffer)); std::cout << resp << std::endl; diff --git a/include/aedis/resp3/client_base.hpp b/include/aedis/resp3/client_base.hpp index 376ca8e3..89e50907 100644 --- a/include/aedis/resp3/client_base.hpp +++ b/include/aedis/resp3/client_base.hpp @@ -55,14 +55,15 @@ private: do { // Loops to consume the response to all commands in the request. do { - auto const t = co_await async_read_type(socket_, buffer); + auto const t = + co_await async_read_type(socket_, net::dynamic_buffer(buffer)); if (t == type::push) { - co_await async_read(socket_, buffer, adapt(push_resp_)); + co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapt(push_resp_)); on_push(); } else { auto adapter = adapt(*srs_.front().commands.front().resp); - co_await async_read(socket_, buffer, adapter); + co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapter); on_event(srs_.front().commands.front()); srs_.front().commands.pop(); } @@ -97,10 +98,10 @@ private: { serializer sr; sr.push(command::hello, 3); - co_await async_write(socket_, net::buffer(sr.request())); + co_await net::async_write(socket_, net::buffer(sr.request())); std::string buffer; - co_await async_read(socket_, buffer, adapt(hello_)); + co_await resp3::async_read(socket_, net::dynamic_buffer(buffer), adapt(hello_)); } // The connection manager. It keeps trying the reconnect to the @@ -113,7 +114,7 @@ private: for (;;) { tcp_resolver resolver{socket_.get_executor()}; auto const res = co_await resolver.async_resolve("127.0.0.1", "6379"); - co_await aedis::net::async_connect(socket_, res); + co_await net::async_connect(socket_, res); co_await say_hello(); diff --git a/include/aedis/resp3/detail/read_ops.hpp b/include/aedis/resp3/detail/read_ops.hpp index 5492c665..6d0ce7ca 100644 --- a/include/aedis/resp3/detail/read_ops.hpp +++ b/include/aedis/resp3/detail/read_ops.hpp @@ -20,18 +20,18 @@ namespace detail { // TODO: Use asio::coroutine. template < class AsyncReadStream, - class Buffer, + class DynamicBuffer, class ResponseAdapter> class parse_op { private: AsyncReadStream& stream_; - Buffer* buf_; + DynamicBuffer buf_; parser parser_; std::size_t consumed_; int start_; public: - parse_op(AsyncReadStream& stream, Buffer* buf, ResponseAdapter adapter) + parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter) : stream_ {stream} , buf_ {buf} , parser_ {adapter} @@ -49,33 +49,24 @@ public: if (parser_.bulk() == type::invalid) { case 1: start_ = 0; - net::async_read_until( - stream_, - net::dynamic_buffer(*buf_), - "\r\n", - std::move(self)); - + net::async_read_until(stream_, buf_, "\r\n", std::move(self)); return; } // On a bulk read we can't read until delimiter since the // payload may contain the delimiter itself so we have to // read the whole chunk. However if the bulk blob is small - // enough it may be already on the buffer buf_ we read - // last time. If it is, there is no need of initiating - // another async op otherwise we have to read the - // missing bytes. - if (std::size(*buf_) < (parser_.bulk_length() + 2)) { + // enough it may be already on the buffer (from the last + // read), in which case there is no need of initiating + // another async op, otherwise we have to read the missing + // bytes. + if (std::size(buf_) < (parser_.bulk_length() + 2)) { start_ = 0; - auto const s = std::ssize(*buf_); + auto const s = std::size(buf_); auto const l = parser_.bulk_length(); - auto const to_read = static_cast(l + 2 - s); - buf_->resize(l + 2); - net::async_read( - stream_, - net::buffer(buf_->data() + s, to_read), - net::transfer_all(), - std::move(self)); + auto const to_read = l + 2 - s; + buf_.grow(to_read); + net::async_read(stream_, buf_.data(s, to_read), net::transfer_all(), std::move(self)); return; } @@ -86,13 +77,13 @@ public: return; } - n = parser_.advance(buf_->data(), n, ec); + n = parser_.advance((char const*)buf_.data(0, n).data(), n, ec); if (ec) { self.complete(ec, 0); return; } - buf_->erase(0, n); + buf_.consume(n); consumed_ += n; if (parser_.done()) { self.complete({}, consumed_); @@ -104,19 +95,18 @@ public: } }; -template +// TODO: Use asio::coroutine. +template class type_op { private: AsyncReadStream& stream_; - Buffer* buf_ = nullptr; + DynamicBuffer buf_; public: - type_op(AsyncReadStream& stream, Buffer* buf) + type_op(AsyncReadStream& stream, DynamicBuffer buf) : stream_ {stream} , buf_ {buf} - { - assert(buf_); - } + { } template void operator()( Self& self @@ -130,17 +120,13 @@ public: return; } - if (std::empty(*buf_)) { - net::async_read_until( - stream_, - net::dynamic_buffer(*buf_), - "\r\n", - std::move(self)); + if (std::size(buf_) == 0) { + net::async_read_until(stream_, buf_, "\r\n", std::move(self)); return; } - assert(!std::empty(*buf_)); - auto const type = to_type(buf_->front()); + auto const* data = (char const*)buf_.data(0, n).data(); + auto const type = to_type(*data); self.complete(ec, type); return; } diff --git a/include/aedis/resp3/read.hpp b/include/aedis/resp3/read.hpp index 047a045b..9b32b418 100644 --- a/include/aedis/resp3/read.hpp +++ b/include/aedis/resp3/read.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -35,13 +35,13 @@ namespace resp3 { */ template < class SyncReadStream, - class Buffer, + class DynamicBuffer, class ResponseAdapter > std::size_t read( SyncReadStream& stream, - Buffer& buf, + DynamicBuffer buf, ResponseAdapter adapter, boost::system::error_code& ec) { @@ -50,7 +50,7 @@ read( std::size_t consumed = 0; do { if (p.bulk() == type::invalid) { - n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec); + n = net::read_until(stream, buf, "\r\n", ec); if (ec) return 0; @@ -62,9 +62,9 @@ read( auto const s = std::size(buf); auto const l = p.bulk_length(); if (s < (l + 2)) { - buf.resize(l + 2); - auto const to_read = static_cast(l + 2 - s); - n = net::read(stream, net::buffer(buf.data() + s, to_read)); + auto const to_read = l + 2 - s; + buf.grow(to_read); + n = net::read(stream, buf.data(s, to_read), ec); if (ec) return 0; @@ -76,11 +76,12 @@ read( } std::error_code ec; - n = p.advance(buf.data(), n, ec); + auto const* data = (char const*) buf.data(0, n).data(); + n = p.advance(data, n, ec); if (ec) return 0; - buf.erase(0, n); + buf.consume(n); consumed += n; } while (!p.done()); @@ -101,16 +102,16 @@ read( */ template< class SyncReadStream, - class Buffer, + class DynamicBuffer, class ResponseAdapter = detail::response_traits::adapter_type> std::size_t read( SyncReadStream& stream, - Buffer& buf, + DynamicBuffer buf, ResponseAdapter adapter = adapt()) { boost::system::error_code ec; - auto const n = read(stream, buf, adapter, ec); + auto const n = resp3::read(stream, buf, adapter, ec); if (ec) BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); @@ -140,13 +141,13 @@ read( */ template < class AsyncReadStream, - class Buffer, + class DynamicBuffer, class ResponseAdapter = detail::response_traits::adapter_type, class CompletionToken = net::default_completion_token_t > auto async_read( AsyncReadStream& stream, - Buffer& buffer, + DynamicBuffer buffer, ResponseAdapter adapter = adapt(), CompletionToken&& token = net::default_completion_token_t{}) @@ -154,7 +155,7 @@ auto async_read( return net::async_compose < CompletionToken , void(boost::system::error_code, std::size_t) - >(detail::parse_op {stream, &buffer, adapter}, + >(detail::parse_op {stream, buffer, adapter}, token, stream); } @@ -175,20 +176,20 @@ auto async_read( */ template < class AsyncReadStream, - class Buffer, + class DynamicBuffer, class CompletionToken = net::default_completion_token_t > auto async_read_type( AsyncReadStream& stream, - Buffer& buffer, + DynamicBuffer buffer, CompletionToken&& token = net::default_completion_token_t{}) { return net::async_compose < CompletionToken , void(boost::system::error_code, type) - >(detail::type_op {stream, &buffer}, token, stream); + >(detail::type_op {stream, buffer}, token, stream); } } // resp3 diff --git a/tests/general.cpp b/tests/general.cpp index 8e05fab4..011dc80c 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -117,7 +117,7 @@ test_general(net::ip::tcp::resolver::results_type const& res) int push_counter = 0; for (;;) { resp.clear(); - co_await resp3::async_read(socket, buffer, adapt(resp), net::use_awaitable); + co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable); if (resp.front().data_type == resp3::type::push) { switch (push_counter) { @@ -537,13 +537,13 @@ test_set(net::ip::tcp::resolver::results_type const& results) std::string buf; { // hello, flushall gresp.clear(); - co_await async_read(socket, buf, adapt(gresp)); - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); } { // set gresp.clear(); - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; check_equal(gresp, expected, "set1"); @@ -554,13 +554,13 @@ test_set(net::ip::tcp::resolver::results_type const& results) std::vector expected { {resp3::type::blob_string, 1UL, 0UL, test_bulk1} }; - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "get1"); } { // set gresp.clear(); - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; check_equal(gresp, expected, "ltrim"); @@ -570,13 +570,13 @@ test_set(net::ip::tcp::resolver::results_type const& results) gresp.clear(); std::vector expected { {resp3::type::blob_string, 1UL, 0UL, test_bulk2} }; - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "get2"); } { // set gresp.clear(); - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; check_equal(gresp, expected, "set3"); @@ -587,13 +587,13 @@ test_set(net::ip::tcp::resolver::results_type const& results) std::vector expected { {resp3::type::blob_string, 1UL, 0UL, {}} }; - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "get3"); } { // quit gresp.clear(); - co_await async_read(socket, buf, adapt(gresp)); + co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; check_equal(gresp, expected, "quit"); @@ -608,7 +608,7 @@ net::awaitable test_simple_string() std::string cmd {"+OK\r\n"}; test_tcp_socket ts {cmd}; gresp.clear(); - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; check_equal(gresp, expected, "simple_string"); @@ -619,7 +619,7 @@ net::awaitable test_simple_string() std::string cmd {"+\r\n"}; test_tcp_socket ts {cmd}; gresp.clear(); - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::simple_string, 1UL, 0UL, {}} }; check_equal(gresp, expected, "simple_string (empty)"); @@ -635,7 +635,7 @@ net::awaitable test_simple_string() // cmd += "\r\n"; // test_tcp_socket ts {cmd}; // resp3::detail::simple_string_adapter res; - // co_await async_read(ts, buffer, res); + // co_await resp3::async_read(ts, buffer, res); // check_equal(res.result, str, "simple_string (large)"); // //check_equal(res.attribute.value, {}, "simple_string (empty attribute)"); //} @@ -651,7 +651,7 @@ net::awaitable test_number() gresp.clear(); std::vector expected { {resp3::type::number, 1UL, 0UL, {"-3"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "number (int)"); } @@ -661,7 +661,7 @@ net::awaitable test_number() gresp.clear(); std::vector expected { {resp3::type::number, 1UL, 0UL, {"3"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "number (unsigned)"); } @@ -671,7 +671,7 @@ net::awaitable test_number() gresp.clear(); std::vector expected { {resp3::type::number, 1UL, 0UL, {"1111111"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "number (std::size_t)"); } } @@ -690,7 +690,7 @@ net::awaitable test_array() , {resp3::type::blob_string, 1UL, 1UL, {"two"}} , {resp3::type::blob_string, 1UL, 1UL, {"three"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "array"); } @@ -709,7 +709,7 @@ net::awaitable test_array() gresp.clear(); std::vector expected { {resp3::type::array, 0UL, 0UL, {}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "array (empty)"); } } @@ -724,7 +724,7 @@ net::awaitable test_blob_string() gresp.clear(); std::vector expected { {resp3::type::blob_string, 1UL, 0UL, {"hh"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "blob_string"); } @@ -734,7 +734,7 @@ net::awaitable test_blob_string() gresp.clear(); std::vector expected { {resp3::type::blob_string, 1UL, 0UL, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "blob_string (with separator)"); } @@ -744,7 +744,7 @@ net::awaitable test_blob_string() gresp.clear(); std::vector expected { {resp3::type::blob_string, 1UL, 0UL, {}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "blob_string (size 0)"); } } @@ -759,7 +759,7 @@ net::awaitable test_simple_error() gresp.clear(); std::vector expected { {resp3::type::simple_error, 1UL, 0UL, {"Error"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "simple_error (message)"); } } @@ -774,7 +774,7 @@ net::awaitable test_floating_point() std::vector resp; std::vector expected { {resp3::type::doublean, 1UL, 0UL, {"1.23"}} }; - co_await async_read(ts, buf, adapt(resp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp)); check_equal(resp, expected, "double"); } @@ -782,7 +782,7 @@ net::awaitable test_floating_point() std::string cmd {",inf\r\n"}; test_tcp_socket ts {cmd}; std::vector resp; - co_await async_read(ts, buf, adapt(resp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp)); std::vector expected { {resp3::type::doublean, 1UL, 0UL, {"inf"}} }; check_equal(resp, expected, "double (inf)"); @@ -792,7 +792,7 @@ net::awaitable test_floating_point() std::string cmd {",-inf\r\n"}; test_tcp_socket ts {cmd}; std::vector resp; - co_await async_read(ts, buf, adapt(resp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp)); std::vector expected { {resp3::type::doublean, 1UL, 0UL, {"-inf"}} }; check_equal(resp, expected, "double (-inf)"); @@ -811,7 +811,7 @@ net::awaitable test_boolean() std::vector expected { {resp3::type::boolean, 1UL, 0UL, {"f"}} }; - co_await async_read(ts, buf, adapt(resp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp)); check_equal(resp, expected, "bool (false)"); } @@ -822,7 +822,7 @@ net::awaitable test_boolean() std::vector expected { {resp3::type::boolean, 1UL, 0UL, {"t"}} }; - co_await async_read(ts, buf, adapt(resp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp)); check_equal(resp, expected, "bool (true)"); } } @@ -837,7 +837,7 @@ net::awaitable test_blob_error() gresp.clear(); std::vector expected { {resp3::type::blob_error, 1UL, 0UL, {"SYNTAX invalid syntax"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "blob_error (message)"); } @@ -848,7 +848,7 @@ net::awaitable test_blob_error() std::vector expected { {resp3::type::blob_error, 1UL, 0UL, {}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "blob_error (empty message)"); } } @@ -863,7 +863,7 @@ net::awaitable test_verbatim_string() gresp.clear(); std::vector expected { {resp3::type::verbatim_string, 1UL, 0UL, {"txt:Some string"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "verbatim_string"); } @@ -871,7 +871,7 @@ net::awaitable test_verbatim_string() std::string cmd {"=0\r\n\r\n"}; test_tcp_socket ts {cmd}; gresp.clear(); - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::verbatim_string, 1UL, 0UL, {}} }; check_equal(gresp, expected, "verbatim_string (empty)"); @@ -896,7 +896,7 @@ net::awaitable test_set2() , {resp3::type::simple_string, 1UL, 1UL, {"three"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "test set (1)"); } @@ -909,7 +909,7 @@ net::awaitable test_set2() { {resp3::type::set, 0UL, 0UL, {}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "test set (2)"); } } @@ -922,7 +922,7 @@ net::awaitable test_map() std::string cmd {"%7\r\n$6\r\nserver\r\n$5\r\nredis\r\n$7\r\nversion\r\n$5\r\n6.0.9\r\n$5\r\nproto\r\n:3\r\n$2\r\nid\r\n:203\r\n$4\r\nmode\r\n$10\r\nstandalone\r\n$4\r\nrole\r\n$6\r\nmaster\r\n$7\r\nmodules\r\n*0\r\n"}; test_tcp_socket ts {cmd}; gresp.clear(); - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::map, 7UL, 0UL, {}} @@ -948,7 +948,7 @@ net::awaitable test_map() std::string cmd {"%0\r\n"}; test_tcp_socket ts {cmd}; gresp.clear(); - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); std::vector expected { {resp3::type::map, 0UL, 0UL, {}} }; check_equal(gresp, expected, "test map (empty)"); @@ -965,7 +965,7 @@ net::awaitable test_streamed_string() gresp.clear(); std::vector expected { {resp3::type::streamed_string_part, 1UL, 0UL, {"Hello world"}} }; - co_await async_read(ts, buf, adapt(gresp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp)); check_equal(gresp, expected, "streamed string"); } @@ -973,7 +973,7 @@ net::awaitable test_streamed_string() std::string cmd {"$?\r\n;0\r\n"}; test_tcp_socket ts {cmd}; std::vector resp; - co_await async_read(ts, buf, adapt(resp)); + co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp)); std::vector expected { {resp3::type::streamed_string_part, 1UL, 0UL, {}} }; diff --git a/tools/commands.cpp b/tools/commands.cpp index 4725b254..ef7d7f39 100644 --- a/tools/commands.cpp +++ b/tools/commands.cpp @@ -11,16 +11,17 @@ #include +namespace resp3 = aedis::resp3; using aedis::command; -using aedis::resp3::serializer; -using aedis::resp3::read; -using aedis::resp3::adapt; -using aedis::resp3::node; +using resp3::serializer; +using resp3::adapt; +using resp3::node; namespace net = aedis::net; using net::ip::tcp; using net::write; using net::buffer; +using net::dynamic_buffer; std::string toupper(std::string s) { @@ -87,9 +88,9 @@ int main() std::vector resp; std::string buffer; - read(socket, buffer); - read(socket, buffer, adapt(resp)); - read(socket, buffer); + resp3::read(socket, dynamic_buffer(buffer)); + resp3::read(socket, dynamic_buffer(buffer), adapt(resp)); + resp3::read(socket, dynamic_buffer(buffer)); auto const cmds = get_cmd_names(resp); print_cmds_enum(cmds);