From 05aff2f3c623c503bc385baa2ffefefc295cd633 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 10 Oct 2021 15:00:22 +0200 Subject: [PATCH] Refactoring listed below: - Improvements in the organization. - Adds support for the default token on the consumer. --- examples/async_basic.cpp | 11 +- include/aedis/aedis.hpp | 3 +- include/aedis/command.hpp | 7 +- include/aedis/resp3/consumer.hpp | 48 +++++ include/aedis/resp3/detail/assemble.hpp | 112 +++++++++++ include/aedis/resp3/detail/impl/assemble.ipp | 44 +++++ include/aedis/resp3/detail/parser.hpp | 77 +++++++- include/aedis/resp3/{ => detail}/read.hpp | 119 ++---------- include/aedis/resp3/detail/write.hpp | 114 +++++++++++ include/aedis/resp3/impl/request.ipp | 41 ---- include/aedis/resp3/impl/write.ipp | 3 +- include/aedis/resp3/node.hpp | 2 +- include/aedis/resp3/request.hpp | 188 +++++-------------- include/aedis/resp3/type.hpp | 5 +- include/aedis/resp3/write.hpp | 94 +--------- include/aedis/src.hpp | 2 +- include/aedis/version.hpp | 2 +- tests/general.cpp | 107 +++++------ 18 files changed, 540 insertions(+), 439 deletions(-) create mode 100644 include/aedis/resp3/consumer.hpp create mode 100644 include/aedis/resp3/detail/assemble.hpp create mode 100644 include/aedis/resp3/detail/impl/assemble.ipp rename include/aedis/resp3/{ => detail}/read.hpp (61%) create mode 100644 include/aedis/resp3/detail/write.hpp diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index d636088a..44d3b3a5 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -9,6 +9,8 @@ using namespace aedis; +using tcp_socket = net::use_awaitable_t<>::as_default_on_t; + void print_event(std::pair const& p) { std::cout << "Event: " << p.first << "."; @@ -20,8 +22,9 @@ void print_event(std::pair const& p) } net::awaitable -example(net::ip::tcp::socket& socket, - std::queue& requests) +example( + tcp_socket& socket, + std::queue& requests) { requests.push({}); requests.back().hello("3"); @@ -31,7 +34,7 @@ example(net::ip::tcp::socket& socket, for (;;) { resp.clear(); - co_await cs.async_consume(socket, requests, resp, net::use_awaitable); + co_await cs.async_consume(socket, requests, resp); std::cout << resp << std::endl; if (resp.get_type() == resp3::type::push) @@ -66,7 +69,7 @@ int main() net::ip::tcp::resolver resolver{ioc}; auto const res = resolver.resolve("127.0.0.1", "6379"); - net::ip::tcp::socket socket{ioc}; + tcp_socket socket{ioc}; net::connect(socket, res); std::queue requests; diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index 5a7e7c1f..48a62141 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -10,4 +10,5 @@ #include #include #include -#include +#include +#include diff --git a/include/aedis/command.hpp b/include/aedis/command.hpp index 25f290a3..ec632fe4 100644 --- a/include/aedis/command.hpp +++ b/include/aedis/command.hpp @@ -11,7 +11,7 @@ namespace aedis { -/// Supported redis commands. +/// List of the supported redis commands. enum class command { acl_load , acl_save @@ -75,7 +75,12 @@ enum class command , unknown }; +/// Converts the command to a string. std::string to_string(command c); + +/** Writes the text representation of the command to the output + * stream. + */ std::ostream& operator<<(std::ostream& os, command c); } // aedis diff --git a/include/aedis/resp3/consumer.hpp b/include/aedis/resp3/consumer.hpp new file mode 100644 index 00000000..592dfa43 --- /dev/null +++ b/include/aedis/resp3/consumer.hpp @@ -0,0 +1,48 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +#include +#include +#include +#include + +namespace aedis { +namespace resp3 { + +/** Reads and writes redis commands. + */ +struct consumer { + std::string buffer; + net::coroutine coro = net::coroutine(); + type t = type::invalid; + + template< + class AsyncReadWriteStream, + class CompletionToken = + net::default_completion_token_t + > + auto async_consume( + AsyncReadWriteStream& stream, + std::queue& requests, + response& resp, + CompletionToken&& token = + net::default_completion_token_t{}) + { + return net::async_compose< + CompletionToken, + void(boost::system::error_code, type)>( + detail::consumer_op{stream, buffer, requests, resp, t, coro}, + token, stream); + } +}; + +} // resp3 +} // aedis diff --git a/include/aedis/resp3/detail/assemble.hpp b/include/aedis/resp3/detail/assemble.hpp new file mode 100644 index 00000000..90bcc164 --- /dev/null +++ b/include/aedis/resp3/detail/assemble.hpp @@ -0,0 +1,112 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace aedis { +namespace resp3 { +namespace detail { + +void add_bulk(std::string& to, std::string_view param); +void add_header(std::string& to, int size); + +struct accumulator { + auto + operator()( + std::string a, + std::string_view b) const + { + add_bulk(a, b); + return a; + } + + template + auto + operator()( + std::string a, + T b, + typename std::enable_if<(std::is_integral::value || std::is_floating_point::value), bool>::type = false) const + { + auto const v = std::to_string(b); + add_bulk(a, v); + return a; + } + + auto + operator()( + std::string a, + std::pair b) const + { + add_bulk(a, b.first); + add_bulk(a, b.second); + return a; + } + + template + auto + operator()( + std::string a, + std::pair b, + typename std::enable_if<(std::is_integral::value || std::is_floating_point::value), bool>::type = false) const + { + auto const v = std::to_string(b.first); + add_bulk(a, v); + add_bulk(a, b.second); + return a; + } +}; + +void assemble(std::string& ret, std::string_view cmd); + +template +void assemble( std::string& ret + , std::string_view cmd + , std::initializer_list key + , Iter begin + , Iter end + , int size = 1) +{ + auto const d1 = + std::distance( std::cbegin(key) + , std::cend(key)); + + auto const d2 = std::distance(begin, end); + + std::string a; + add_header(a, 1 + d1 + size * d2); + add_bulk(a, cmd); + + auto b = + std::accumulate( std::cbegin(key) + , std::cend(key) + , std::move(a) + , accumulator{}); + + ret += + std::accumulate( begin + , end + , std::move(b) + , accumulator{}); +} + +void assemble(std::string& ret, std::string_view cmd, std::string_view key); + +} // detail +} // resp3 +} // aedis diff --git a/include/aedis/resp3/detail/impl/assemble.ipp b/include/aedis/resp3/detail/impl/assemble.ipp new file mode 100644 index 00000000..959dd44d --- /dev/null +++ b/include/aedis/resp3/detail/impl/assemble.ipp @@ -0,0 +1,44 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include + +namespace aedis { +namespace resp3 { +namespace detail { + +void add_bulk(std::string& to, std::string_view param) +{ + to += "$"; + to += std::to_string(std::size(param)); + to += "\r\n"; + to += param; + to += "\r\n"; +} + +void add_header(std::string& to, int size) +{ + to += "*"; + to += std::to_string(size); + to += "\r\n"; +} + +void assemble(std::string& ret, std::string_view cmd) +{ + add_header(ret, 1); + add_bulk(ret, cmd); +} + +void assemble(std::string& ret, std::string_view cmd, std::string_view key) +{ + std::initializer_list dummy; + assemble(ret, cmd, {key}, std::cbegin(dummy), std::cend(dummy)); +} + +} // detail +} // resp3 +} // aedis diff --git a/include/aedis/resp3/detail/parser.hpp b/include/aedis/resp3/detail/parser.hpp index 52797704..e684d463 100644 --- a/include/aedis/resp3/detail/parser.hpp +++ b/include/aedis/resp3/detail/parser.hpp @@ -10,7 +10,9 @@ #include #include -namespace aedis { namespace resp3 { namespace detail { +namespace aedis { +namespace resp3 { +namespace detail { // resp3 parser. class parser { @@ -48,6 +50,79 @@ public: auto bulk_length() const noexcept { return bulk_length_; } }; +// The parser supports up to 5 levels of nested structures. The first +// element in the sizes stack is a sentinel and must be different from +// 1. +template +class parse_op { +private: + AsyncReadStream& stream_; + Storage* buf_ = nullptr; + detail::parser parser_; + int start_ = 1; + +public: + parse_op(AsyncReadStream& stream, Storage* buf, response_adapter_base* res) + : stream_ {stream} + , buf_ {buf} + , parser_ {res} + { } + + template + void operator()( Self& self + , boost::system::error_code ec = {} + , std::size_t n = 0) + { + switch (start_) { + for (;;) { + if (parser_.bulk() == detail::parser::bulk_type::none) { + case 1: + start_ = 0; + net::async_read_until( + stream_, + net::dynamic_buffer(*buf_), + "\r\n", + std::move(self)); + + return; + } + + // On a bulk read we can't read until delimiter since the + // payload may contain the delimiter itself so we have to + // read the whole chunk. However if the bulk blob is small + // enough it may be already on the buffer buf_ we read + // last time. If it is, there is no need of initiating + // another async op otherwise we have to read the + // missing bytes. + if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) { + start_ = 0; + auto const s = std::ssize(*buf_); + auto const l = parser_.bulk_length(); + auto const to_read = static_cast(l + 2 - s); + buf_->resize(l + 2); + net::async_read( + stream_, + net::buffer(buf_->data() + s, to_read), + net::transfer_all(), + std::move(self)); + return; + } + + default: + { + if (ec) + return self.complete(ec); + + n = parser_.advance(buf_->data(), n); + buf_->erase(0, n); + if (parser_.done()) + return self.complete({}); + } + } + } + } +}; + } // detail } // resp3 } // aedis diff --git a/include/aedis/resp3/read.hpp b/include/aedis/resp3/detail/read.hpp similarity index 61% rename from include/aedis/resp3/read.hpp rename to include/aedis/resp3/detail/read.hpp index fc590164..35b8b943 100644 --- a/include/aedis/resp3/read.hpp +++ b/include/aedis/resp3/detail/read.hpp @@ -7,93 +7,20 @@ #pragma once -#include - #include -#include #include #include #include -#include #include +#include +#include #include -namespace aedis { namespace resp3 { - -// The parser supports up to 5 levels of nested structures. The first -// element in the sizes stack is a sentinel and must be different from -// 1. -template -class parse_op { -private: - AsyncReadStream& stream_; - Storage* buf_ = nullptr; - detail::parser parser_; - int start_ = 1; - -public: - parse_op(AsyncReadStream& stream, Storage* buf, response_adapter_base* res) - : stream_ {stream} - , buf_ {buf} - , parser_ {res} - { } - - template - void operator()( Self& self - , boost::system::error_code ec = {} - , std::size_t n = 0) - { - switch (start_) { - for (;;) { - if (parser_.bulk() == detail::parser::bulk_type::none) { - case 1: - start_ = 0; - net::async_read_until( - stream_, - net::dynamic_buffer(*buf_), - "\r\n", - std::move(self)); - - return; - } - - // On a bulk read we can't read until delimiter since the - // payload may contain the delimiter itself so we have to - // read the whole chunk. However if the bulk blob is small - // enough it may be already on the buffer buf_ we read - // last time. If it is, there is no need of initiating - // another async op otherwise we have to read the - // missing bytes. - if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) { - start_ = 0; - auto const s = std::ssize(*buf_); - auto const l = parser_.bulk_length(); - auto const to_read = static_cast(l + 2 - s); - buf_->resize(l + 2); - net::async_read( - stream_, - net::buffer(buf_->data() + s, to_read), - net::transfer_all(), - std::move(self)); - return; - } - - default: - { - if (ec) - return self.complete(ec); - - n = parser_.advance(buf_->data(), n); - buf_->erase(0, n); - if (parser_.done()) - return self.complete({}); - } - } - } - } -}; +namespace aedis { +namespace resp3 { +namespace detail { template auto read( @@ -102,10 +29,10 @@ auto read( response_adapter_base& res, boost::system::error_code& ec) { - detail::parser p {&res}; + parser p {&res}; std::size_t n = 0; do { - if (p.bulk() == detail::parser::bulk_type::none) { + if (p.bulk() == parser::bulk_type::none) { n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec); if (ec || n < 3) return n; @@ -226,8 +153,9 @@ auto async_read_type( >(type_op {stream, &buffer}, token, stream); } +template struct consumer_op { - net::ip::tcp::socket& socket; + AsyncReadWriteStream& stream; std::string& buffer; std::queue& requests; response& resp; @@ -242,7 +170,7 @@ struct consumer_op { { reenter (coro) for (;;) { - yield async_write_some(socket, requests, std::move(self)); + yield async_write_some(stream, requests, std::move(self)); if (ec) { self.complete(ec, type::invalid); return; @@ -250,7 +178,7 @@ struct consumer_op { do { do { - yield async_read_type(socket, buffer, std::move(self)); + yield async_read_type(stream, buffer, std::move(self)); if (ec) { self.complete(ec, type::invalid); return; @@ -262,11 +190,11 @@ struct consumer_op { { if (m_type == type::push) { auto* adapter = resp.select_adapter(m_type, command::unknown, {}); - async_read_one(socket, buffer, *adapter, std::move(self)); + async_read_one(stream, buffer, *adapter, std::move(self)); } else { auto const& pair = requests.front().ids.front(); auto* adapter = resp.select_adapter(m_type, pair.first, pair.second); - async_read_one(socket, buffer, *adapter, std::move(self)); + async_read_one(stream, buffer, *adapter, std::move(self)); } } @@ -287,26 +215,7 @@ struct consumer_op { } }; -struct consumer { - std::string buffer; - response resp; - net::coroutine coro = net::coroutine(); - type t = type::invalid; - - template - auto async_consume( - net::ip::tcp::socket& socket, - std::queue& requests, - response& resp, - CompletionToken&& token) - { - return net::async_compose< - CompletionToken, - void(boost::system::error_code, type)>( - consumer_op{socket, buffer, requests, resp, t, coro}, token, socket); - } -}; - +} // detail } // resp3 } // aedis diff --git a/include/aedis/resp3/detail/write.hpp b/include/aedis/resp3/detail/write.hpp new file mode 100644 index 00000000..c7747149 --- /dev/null +++ b/include/aedis/resp3/detail/write.hpp @@ -0,0 +1,114 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +#include +#include + +#include +#include + +namespace aedis { +namespace resp3 { +namespace detail { + +template +std::size_t +write( + SyncWriteStream& stream, + request& req, + boost::system::error_code& ec) +{ + static_assert(boost::beast::is_sync_write_stream::value, + "SyncWriteStream type requirements not met"); + + return write(stream, net::buffer(req.payload), ec); +} + +template +std::size_t write( + SyncWriteStream& stream, + request& req) +{ + static_assert(boost::beast::is_sync_write_stream::value, + "SyncWriteStream type requirements not met"); + + boost::system::error_code ec; + auto const bytes_transferred = write(stream, req, ec); + + if (ec) + BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); + + return bytes_transferred; +} + +/** Asynchronously writes one or more requests on the stream. + */ +template +struct write_some_op { + AsyncWriteStream& stream; + std::queue& requests; + net::coroutine coro = net::coroutine(); + + void + operator()( + auto& self, + boost::system::error_code const& ec = {}, + std::size_t n = 0) + { + reenter (coro) { + do { + assert(!std::empty(requests)); + assert(!std::empty(requests.front().payload)); + + yield async_write( + stream, + net::buffer(requests.front().payload), + std::move(self)); + + if (ec) + break; + + requests.front().sent = true; + + if (std::empty(requests.front().ids)) { + // We only pop when all commands in the pipeline has push + // responses like subscribe, otherwise, pop is done when the + // response arrives. + requests.pop(); + } + } while (!std::empty(requests) && std::empty(requests.front().ids)); + + self.complete(ec); + } + } +}; + +template< + class AsyncWriteStream, + class CompletionToken> +auto +async_write_some( + AsyncWriteStream& stream, + std::queue& requests, + CompletionToken&& token) +{ + return net::async_compose< + CompletionToken, + void(boost::system::error_code)>( + write_some_op{stream, requests}, + token, stream); +} + +} // detail +} // resp3 +} // aedis + +#include diff --git a/include/aedis/resp3/impl/request.ipp b/include/aedis/resp3/impl/request.ipp index bfc1455e..e69de29b 100644 --- a/include/aedis/resp3/impl/request.ipp +++ b/include/aedis/resp3/impl/request.ipp @@ -1,41 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include - -namespace aedis { namespace resp3 { - -void add_bulk(std::string& to, std::string_view param) -{ - to += "$"; - to += std::to_string(std::size(param)); - to += "\r\n"; - to += param; - to += "\r\n"; -} - -void add_header(std::string& to, int size) -{ - to += "*"; - to += std::to_string(size); - to += "\r\n"; -} - -void assemble(std::string& ret, std::string_view cmd) -{ - add_header(ret, 1); - add_bulk(ret, cmd); -} - -void assemble(std::string& ret, std::string_view cmd, std::string_view key) -{ - std::initializer_list dummy; - assemble(ret, cmd, {key}, std::cbegin(dummy), std::cend(dummy)); -} - -} // resp3 -} // aedis diff --git a/include/aedis/resp3/impl/write.ipp b/include/aedis/resp3/impl/write.ipp index 9b68d15e..7f18c00e 100644 --- a/include/aedis/resp3/impl/write.ipp +++ b/include/aedis/resp3/impl/write.ipp @@ -7,7 +7,8 @@ #include -namespace aedis { namespace resp3 { +namespace aedis { +namespace resp3 { bool prepare_next(std::queue& reqs) { diff --git a/include/aedis/resp3/node.hpp b/include/aedis/resp3/node.hpp index 5af102bc..9fc47c3e 100644 --- a/include/aedis/resp3/node.hpp +++ b/include/aedis/resp3/node.hpp @@ -34,7 +34,7 @@ bool operator==(node const& a, node const& b); /** Writes the text representation of node to the output stream. * - * NOTE: Be careful when printing binary data. + * NOTE: Bonary data is not converted to text. */ std::ostream& operator<<(std::ostream& os, node const& o); diff --git a/include/aedis/resp3/request.hpp b/include/aedis/resp3/request.hpp index 7b0dbf95..aa33bdac 100644 --- a/include/aedis/resp3/request.hpp +++ b/include/aedis/resp3/request.hpp @@ -10,103 +10,15 @@ #include #include #include -#include -#include -#include -#include #include #include #include +#include namespace aedis { namespace resp3 { -// TODO: Move to detail. -void add_bulk(std::string& to, std::string_view param); -void add_header(std::string& to, int size); - -struct accumulator { - auto - operator()( - std::string a, - std::string_view b) const - { - add_bulk(a, b); - return a; - } - - template - auto - operator()( - std::string a, - T b, - typename std::enable_if<(std::is_integral::value || std::is_floating_point::value), bool>::type = false) const - { - auto const v = std::to_string(b); - add_bulk(a, v); - return a; - } - - auto - operator()( - std::string a, - std::pair b) const - { - add_bulk(a, b.first); - add_bulk(a, b.second); - return a; - } - - template - auto - operator()( - std::string a, - std::pair b, - typename std::enable_if<(std::is_integral::value || std::is_floating_point::value), bool>::type = false) const - { - auto const v = std::to_string(b.first); - add_bulk(a, v); - add_bulk(a, b.second); - return a; - } -}; - -void assemble(std::string& ret, std::string_view cmd); - -template -void assemble( std::string& ret - , std::string_view cmd - , std::initializer_list key - , Iter begin - , Iter end - , int size = 1) -{ - auto const d1 = - std::distance( std::cbegin(key) - , std::cend(key)); - - auto const d2 = std::distance(begin, end); - - std::string a; - add_header(a, 1 + d1 + size * d2); - add_bulk(a, cmd); - - auto b = - std::accumulate( std::cbegin(key) - , std::cend(key) - , std::move(a) - , accumulator{}); - - ret += - std::accumulate( begin - , end - , std::move(b) - , accumulator{}); -} - -void assemble(std::string& ret, std::string_view cmd, std::string_view key); - /** A class to compose redis requests * * A request is composed of one or more redis commands and is @@ -140,66 +52,66 @@ public: void ping() { - assemble(payload, "PING"); + detail::assemble(payload, "PING"); ids.push(std::make_pair(command::ping, std::string{})); } void quit() { - assemble(payload, "QUIT"); + detail::assemble(payload, "QUIT"); ids.push(std::make_pair(command::quit, std::string{})); } void multi() { - assemble(payload, "MULTI"); + detail::assemble(payload, "MULTI"); ids.push(std::make_pair(command::multi, std::string{})); } void exec() { - assemble(payload, "EXEC"); + detail::assemble(payload, "EXEC"); ids.push(std::make_pair(command::exec, std::string{})); } void incr(std::string_view key) { - assemble(payload, "INCR", key); + detail::assemble(payload, "INCR", key); ids.push(std::make_pair(command::incr, std::string{key})); } /// Adds auth to the request, see https://redis.io/commands/bgrewriteaof void auth(std::string_view pwd) { - assemble(payload, "AUTH", pwd); + detail::assemble(payload, "AUTH", pwd); ids.push(std::make_pair(command::auth, std::string{})); } /// Adds bgrewriteaof to the request, see https://redis.io/commands/bgrewriteaof void bgrewriteaof() { - assemble(payload, "BGREWRITEAOF"); + detail::assemble(payload, "BGREWRITEAOF"); ids.push(std::make_pair(command::bgrewriteaof, std::string{})); } /// Adds role to the request, see https://redis.io/commands/role void role() { - assemble(payload, "ROLE"); + detail::assemble(payload, "ROLE"); ids.push(std::make_pair(command::role, std::string{})); } /// Adds bgsave to the request, see //https://redis.io/commands/bgsave void bgsave() { - assemble(payload, "BGSAVE"); + detail::assemble(payload, "BGSAVE"); ids.push(std::make_pair(command::bgsave, std::string{})); } /// Adds ping to the request, see https://redis.io/commands/flushall void flushall() { - assemble(payload, "FLUSHALL"); + detail::assemble(payload, "FLUSHALL"); ids.push(std::make_pair(command::flushall, std::string{})); } @@ -207,10 +119,10 @@ public: void lpop(std::string_view key, int count = 1) { //if (count == 1) { - assemble(payload, "LPOP", key); + detail::assemble(payload, "LPOP", key); //} else { //auto par = {std::to_string(count)}; - //assemble( + //detail::assemble( // payload, // "LPOP", // {key}, @@ -225,34 +137,34 @@ public: void subscribe(std::string_view key) { // The response to this command is a push. - assemble(payload, "SUBSCRIBE", key); + detail::assemble(payload, "SUBSCRIBE", key); } /// Adds ping to the request, see https://redis.io/commands/unsubscribe void unsubscribe(std::string_view key) { // The response to this command is a push. - assemble(payload, "UNSUBSCRIBE", key); + detail::assemble(payload, "UNSUBSCRIBE", key); } /// Adds ping to the request, see https://redis.io/commands/get void get(std::string_view key) { - assemble(payload, "GET", key); + detail::assemble(payload, "GET", key); ids.push(std::make_pair(command::get, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/keys void keys(std::string_view pattern) { - assemble(payload, "KEYS", pattern); + detail::assemble(payload, "KEYS", pattern); ids.push(std::make_pair(command::keys, std::string{})); } /// Adds ping to the request, see https://redis.io/commands/hello void hello(std::string_view version = "3") { - assemble(payload, "HELLO", version); + detail::assemble(payload, "HELLO", version); ids.push(std::make_pair(command::hello, std::string{})); } @@ -260,7 +172,7 @@ public: void sentinel(std::string_view arg, std::string_view name) { auto par = {name}; - assemble(payload, "SENTINEL", {arg}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "SENTINEL", {arg}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::sentinel, std::string{})); } @@ -268,7 +180,7 @@ public: void append(std::string_view key, std::string_view msg) { auto par = {msg}; - assemble(payload, "APPEND", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "APPEND", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::append, std::string{key})); } @@ -279,7 +191,7 @@ public: auto const end_str = std::to_string(end); std::initializer_list par {start_str, end_str}; - assemble( payload + detail::assemble( payload , "BITCOUNT" , {key} , std::cbegin(par) @@ -291,7 +203,7 @@ public: template void rpush(std::string_view key, Iter begin, Iter end) { - assemble(payload, "RPUSH", {key}, begin, end); + detail::assemble(payload, "RPUSH", {key}, begin, end); ids.push(std::make_pair(command::rpush, std::string{key})); } @@ -315,21 +227,21 @@ public: template void lpush(std::string_view key, Iter begin, Iter end) { - assemble(payload, "LPUSH", {key}, begin, end); + detail::assemble(payload, "LPUSH", {key}, begin, end); ids.push(std::make_pair(command::lpush, std::string{key})); } void psubscribe( std::initializer_list l) { std::initializer_list dummy = {}; - assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy)); + detail::assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy)); } /// Adds ping to the request, see https://redis.io/commands/publish void publish(std::string_view key, std::string_view msg) { auto par = {msg}; - assemble(payload, "PUBLISH", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "PUBLISH", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::publish, std::string{key})); } @@ -337,7 +249,7 @@ public: void set(std::string_view key, std::initializer_list args) { - assemble(payload, "SET", {key}, std::cbegin(args), std::cend(args)); + detail::assemble(payload, "SET", {key}, std::cbegin(args), std::cend(args)); ids.push(std::make_pair(command::set, std::string{key})); } @@ -350,7 +262,7 @@ public: //error: ERR Protocol error: expected '$', got '*' using std::cbegin; using std::cend; - assemble(payload, "HSET", {key}, std::cbegin(r), std::cend(r), 2); + detail::assemble(payload, "HSET", {key}, std::cbegin(r), std::cend(r), 2); ids.push(std::make_pair(command::hset, std::string{key})); } @@ -359,7 +271,7 @@ public: { auto by_str = std::to_string(by); std::initializer_list par {field, by_str}; - assemble(payload, "HINCRBY", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "HINCRBY", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::hincrby, std::string{key})); } @@ -367,28 +279,28 @@ public: void hkeys(std::string_view key) { auto par = {""}; - assemble(payload, "HKEYS", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "HKEYS", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::hkeys, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hlen void hlen(std::string_view key) { - assemble(payload, "HLEN", {key}); + detail::assemble(payload, "HLEN", {key}); ids.push(std::make_pair(command::hlen, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hgetall void hgetall(std::string_view key) { - assemble(payload, "HGETALL", {key}); + detail::assemble(payload, "HGETALL", {key}); ids.push(std::make_pair(command::hgetall, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hvals void hvals( std::string_view key) { - assemble(payload, "HVALS", {key}); + detail::assemble(payload, "HVALS", {key}); ids.push(std::make_pair(command::hvals, std::string{key})); } @@ -396,7 +308,7 @@ public: void hget(std::string_view key, std::string_view field) { auto par = {field}; - assemble(payload, "HGET", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "HGET", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::hget, std::string{key})); } @@ -405,7 +317,7 @@ public: std::string_view key, std::initializer_list fields) { - assemble( payload + detail::assemble( payload , "HMGET" , {key} , std::cbegin(fields) @@ -419,7 +331,7 @@ public: hdel(std::string_view key, std::initializer_list fields) { - assemble( + detail::assemble( payload, "HDEL", {key}, @@ -434,7 +346,7 @@ public: { auto const str = std::to_string(secs); std::initializer_list par {str}; - assemble(payload, "EXPIRE", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "EXPIRE", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::expire, std::string{key})); } @@ -443,7 +355,7 @@ public: { auto const score_str = std::to_string(score); std::initializer_list par = {score_str, value}; - assemble(payload, "ZADD", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "ZADD", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::zadd, std::string{key})); } @@ -451,7 +363,7 @@ public: template void zadd(std::initializer_list key, Range const& r) { - assemble(payload, "ZADD", key, std::cbegin(r), std::cend(r), 2); + detail::assemble(payload, "ZADD", key, std::cbegin(r), std::cend(r), 2); ids.push(std::make_pair(command::zadd, std::string{key})); } @@ -462,7 +374,7 @@ public: auto const max_str = std::to_string(max); std::initializer_list par {min_str, max_str}; - assemble(payload, "ZRANGE", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "ZRANGE", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::zrange, std::string{key})); } @@ -475,7 +387,7 @@ public: auto const min_str = std::to_string(min); auto par = {min_str , max_str}; - assemble(payload, "ZRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "ZRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::zrangebyscore, std::string{key})); } @@ -487,7 +399,7 @@ public: std::string_view max) { auto par = {min, max}; - assemble(payload, "ZREMRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "ZREMRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::zremrangebyscore, std::string{key})); } @@ -497,7 +409,7 @@ public: auto const min_str = std::to_string(min); auto const max_str = std::to_string(max); std::initializer_list par {min_str, max_str}; - assemble(payload, "LRANGE", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "LRANGE", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::lrange, std::string{key})); } @@ -507,7 +419,7 @@ public: auto const min_str = std::to_string(min); auto const max_str = std::to_string(max); std::initializer_list par {min_str, max_str}; - assemble(payload, "LTRIM", {key}, std::cbegin(par), std::cend(par)); + detail::assemble(payload, "LTRIM", {key}, std::cbegin(par), std::cend(par)); ids.push(std::make_pair(command::ltrim, std::string{key})); } @@ -515,14 +427,14 @@ public: /// Adds ping to the request, see https://redis.io/commands/del void del(std::string_view key) { - assemble(payload, "DEL", key); + detail::assemble(payload, "DEL", key); ids.push(std::make_pair(command::del, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/llen void llen(std::string_view key) { - assemble(payload, "LLEN", key); + detail::assemble(payload, "LLEN", key); ids.push(std::make_pair(command::llen, std::string{key})); } @@ -530,7 +442,7 @@ public: template void sadd(std::string_view key, Iter begin, Iter end) { - assemble(payload, "SADD", {key}, begin, end); + detail::assemble(payload, "SADD", {key}, begin, end); ids.push(std::make_pair(command::sadd, std::string{key})); } @@ -546,28 +458,28 @@ public: /// Adds ping to the request, see https://redis.io/commands/smembers void smembers(std::string_view key) { - assemble(payload, "SMEMBERS", key); + detail::assemble(payload, "SMEMBERS", key); ids.push(std::make_pair(command::smembers, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/scard void scard(std::string_view key) { - assemble(payload, "SCARD", key); + detail::assemble(payload, "SCARD", key); ids.push(std::make_pair(command::scard, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/scard void scard(std::string_view key, std::initializer_list l) { - assemble(payload, "SDIFF", {key}, std::cbegin(l), std::cend(l)); + detail::assemble(payload, "SDIFF", {key}, std::cbegin(l), std::cend(l)); ids.push(std::make_pair(command::sdiff, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/client_id void client_id(std::string_view parameters) { - assemble(payload, "CLIENT ID", {parameters}); + detail::assemble(payload, "CLIENT ID", {parameters}); ids.push(std::make_pair(command::client_id, std::string{})); } }; diff --git a/include/aedis/resp3/type.hpp b/include/aedis/resp3/type.hpp index 361e691a..d5c4730b 100644 --- a/include/aedis/resp3/type.hpp +++ b/include/aedis/resp3/type.hpp @@ -11,7 +11,8 @@ #include #include -namespace aedis { namespace resp3 { +namespace aedis { +namespace resp3 { /// RESP3 message types. enum class type @@ -40,7 +41,7 @@ std::string to_string(type t); // TODO: Move to detail? type to_type(char c); -/// Writes the string representation of type to the stream. +/// Writes the string representation of type to the output stream. std::ostream& operator<<(std::ostream& os, type t); } // resp3 diff --git a/include/aedis/resp3/write.hpp b/include/aedis/resp3/write.hpp index db1dac83..d630be50 100644 --- a/include/aedis/resp3/write.hpp +++ b/include/aedis/resp3/write.hpp @@ -15,97 +15,13 @@ #include #include -namespace aedis { namespace resp3 { +namespace aedis { +namespace resp3 { -bool prepare_next(std::queue& reqs); - -template -std::size_t -write( - SyncWriteStream& stream, - request& req, - boost::system::error_code& ec) -{ - static_assert(boost::beast::is_sync_write_stream::value, - "SyncWriteStream type requirements not met"); - - return write(stream, net::buffer(req.payload), ec); -} - -template -std::size_t write( - SyncWriteStream& stream, - request& req) -{ - static_assert(boost::beast::is_sync_write_stream::value, - "SyncWriteStream type requirements not met"); - - boost::system::error_code ec; - auto const bytes_transferred = write(stream, req, ec); - - if (ec) - BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); - - return bytes_transferred; -} - -/** Asynchronously writes one or more requests on the stream. +/** Prepares the back of a queue to receive further commands and + * returns true if a write is possible. */ -template -struct write_some_op { - AsyncWriteStream& stream; - std::queue& requests; - net::coroutine coro = net::coroutine(); - - void - operator()( - auto& self, - boost::system::error_code const& ec = {}, - std::size_t n = 0) - { - reenter (coro) { - do { - assert(!std::empty(requests)); - assert(!std::empty(requests.front().payload)); - - yield async_write( - stream, - net::buffer(requests.front().payload), - std::move(self)); - - if (ec) - break; - - requests.front().sent = true; - - if (std::empty(requests.front().ids)) { - // We only pop when all commands in the pipeline has push - // responses like subscribe, otherwise, pop is done when the - // response arrives. - requests.pop(); - } - } while (!std::empty(requests) && std::empty(requests.front().ids)); - - self.complete(ec); - } - } -}; - -template< - class AsyncWriteStream, - class CompletionToken> -auto -async_write_some( - AsyncWriteStream& stream, - std::queue& requests, - CompletionToken&& token) -{ - return net::async_compose< - CompletionToken, - void(boost::system::error_code)>( - write_some_op{stream, requests}, - token, stream); -} +bool prepare_next(std::queue& reqs); } // resp3 } // aedis diff --git a/include/aedis/src.hpp b/include/aedis/src.hpp index c56c64cf..f58b7276 100644 --- a/include/aedis/src.hpp +++ b/include/aedis/src.hpp @@ -7,8 +7,8 @@ #include #include -#include #include #include #include +#include #include diff --git a/include/aedis/version.hpp b/include/aedis/version.hpp index 5c101755..c7ca5f7d 100644 --- a/include/aedis/version.hpp +++ b/include/aedis/version.hpp @@ -5,5 +5,5 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -#define AEDIS_VERSION 4 +#define AEDIS_VERSION 5 diff --git a/tests/general.cpp b/tests/general.cpp index 4233b826..a4c649ba 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -6,6 +6,7 @@ */ #include +#include #include "test_stream.hpp" @@ -31,6 +32,7 @@ using flat_array_int_type = detail::basic_flat_array; } // aedis using namespace aedis; +using namespace aedis::resp3; resp3::response::storage_type array_buffer; resp3::detail::response_adapter radapter{&array_buffer}; @@ -131,10 +133,9 @@ test_general(net::ip::tcp::resolver::results_type const& res) int push_counter = 0; for (;;) { resp.clear(); - auto const t = - co_await cs.async_consume(socket, requests, resp, net::use_awaitable); + co_await cs.async_consume(socket, requests, resp, net::use_awaitable); - if (t == resp3::type::push) { + if (resp.get_type() == resp3::type::push) { switch (push_counter) { case 0: { @@ -368,7 +369,7 @@ test_general(net::ip::tcp::resolver::results_type const& res) } break; case command::lpop: { - switch (t) + switch (resp.get_type()) { case resp3::type::blob_string: { @@ -428,7 +429,7 @@ test_general(net::ip::tcp::resolver::results_type const& res) }; check_equal(resp.raw(), expected, "smembers (value)"); } break; - default: { std::cout << "Error: " << t << " " << id.first << std::endl; } + default: { std::cout << "Error: " << resp.get_type() << " " << id.first << std::endl; } } resp.raw().clear(); @@ -461,13 +462,13 @@ test_list(net::ip::tcp::resolver::results_type const& results) { // hello array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); } { // flushall array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; check_equal(array_buffer, expected, "flushall"); @@ -478,21 +479,21 @@ test_list(net::ip::tcp::resolver::results_type const& results) radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::number, {"6"}} }; - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); check_equal(array_buffer, expected, "rpush"); } { // lrange resp3::flat_array_int_type buffer; resp3::detail::basic_flat_array_adapter res{&buffer}; - co_await async_read_one(socket, buf, res); + co_await detail::async_read_one(socket, buf, res); check_equal(buffer, list, "lrange-1"); } { // lrange resp3::flat_array_int_type buffer; resp3::detail::basic_flat_array_adapter res{&buffer}; - co_await async_read_one(socket, buf, res); + co_await detail::async_read_one(socket, buf, res); check_equal(buffer, std::vector{3, 4, 5}, "lrange-2"); } @@ -502,7 +503,7 @@ test_list(net::ip::tcp::resolver::results_type const& results) resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); check_equal(array_buffer, expected, "ltrim"); } @@ -512,14 +513,14 @@ test_list(net::ip::tcp::resolver::results_type const& results) resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_string, {"3"}} }; - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); check_equal(array_buffer, expected, "lpop"); } { // quit array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; check_equal(array_buffer, expected, "ltrim"); @@ -560,14 +561,14 @@ test_set(net::ip::tcp::resolver::results_type const& results) { // hello, flushall array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); } { // set array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; check_equal(array_buffer, expected, "set1"); @@ -579,14 +580,14 @@ test_set(net::ip::tcp::resolver::results_type const& results) resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_string, test_bulk1} }; - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); check_equal(array_buffer, expected, "get1"); } { // set array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; check_equal(array_buffer, expected, "ltrim"); @@ -597,14 +598,14 @@ test_set(net::ip::tcp::resolver::results_type const& results) radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_string, test_bulk2} }; - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); check_equal(array_buffer, expected, "get2"); } { // set array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; check_equal(array_buffer, expected, "set3"); @@ -616,14 +617,14 @@ test_set(net::ip::tcp::resolver::results_type const& results) resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_string, {}} }; - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); check_equal(array_buffer, expected, "get3"); } { // quit array_buffer.clear(); radapter.clear(); - co_await async_read_one(socket, buf, radapter); + co_await detail::async_read_one(socket, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; check_equal(array_buffer, expected, "quit"); @@ -647,7 +648,7 @@ net::awaitable test_simple_string() test_tcp_socket ts {cmd}; array_buffer.clear(); radapter.clear(); - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {"OK"}} }; check_equal(array_buffer, expected, "simple_string"); @@ -659,7 +660,7 @@ net::awaitable test_simple_string() test_tcp_socket ts {cmd}; array_buffer.clear(); radapter.clear(); - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_string, {}} }; check_equal(array_buffer, expected, "simple_string (empty)"); @@ -675,7 +676,7 @@ net::awaitable test_simple_string() // cmd += "\r\n"; // test_tcp_socket ts {cmd}; // resp3::detail::simple_string_adapter res; - // co_await async_read_one(ts, buffer, res); + // co_await detail::async_read_one(ts, buffer, res); // check_equal(res.result, str, "simple_string (large)"); // //check_equal(res.attribute.value, {}, "simple_string (empty attribute)"); //} @@ -692,7 +693,7 @@ net::awaitable test_number() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::number, {"-3"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "number (int)"); } @@ -703,7 +704,7 @@ net::awaitable test_number() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::number, {"3"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "number (unsigned)"); } @@ -714,7 +715,7 @@ net::awaitable test_number() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::number, {"1111111"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "number (std::size_t)"); } } @@ -734,7 +735,7 @@ net::awaitable test_array() , {1UL, 1UL, resp3::type::blob_string, {"two"}} , {1UL, 1UL, resp3::type::blob_string, {"three"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "array"); } @@ -743,7 +744,7 @@ net::awaitable test_array() test_tcp_socket ts {cmd}; resp3::flat_array_int_type buffer; resp3::flat_array_int_adapter res{&buffer}; - co_await async_read_one(ts, buf, res); + co_await detail::async_read_one(ts, buf, res); check_equal(buffer, {1, 2, 3}, "array (int)"); } @@ -754,7 +755,7 @@ net::awaitable test_array() radapter.clear(); resp3::response::storage_type expected { {0UL, 0UL, resp3::type::array, {}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "array (empty)"); } } @@ -770,7 +771,7 @@ net::awaitable test_blob_string() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_string, {"hh"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "blob_string"); } @@ -781,7 +782,7 @@ net::awaitable test_blob_string() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_string, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "blob_string (with separator)"); } @@ -792,7 +793,7 @@ net::awaitable test_blob_string() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_string, {}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "blob_string (size 0)"); } } @@ -808,7 +809,7 @@ net::awaitable test_simple_error() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::simple_error, {"Error"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "simple_error (message)"); } } @@ -824,7 +825,7 @@ net::awaitable test_floating_point() auto* adapter = resp.select_adapter(resp3::type::doublean); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::doublean, {"1.23"}} }; - co_await async_read_one(ts, buf, *adapter); + co_await detail::async_read_one(ts, buf, *adapter); check_equal(resp.raw(), expected, "double"); } @@ -833,7 +834,7 @@ net::awaitable test_floating_point() test_tcp_socket ts {cmd}; resp3::response resp; auto* adapter = resp.select_adapter(resp3::type::doublean); - co_await async_read_one(ts, buf, *adapter); + co_await detail::async_read_one(ts, buf, *adapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::doublean, {"inf"}} }; check_equal(resp.raw(), expected, "double (inf)"); @@ -844,7 +845,7 @@ net::awaitable test_floating_point() test_tcp_socket ts {cmd}; resp3::response resp; auto* adapter = resp.select_adapter(resp3::type::doublean); - co_await async_read_one(ts, buf, *adapter); + co_await detail::async_read_one(ts, buf, *adapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::doublean, {"-inf"}} }; check_equal(resp.raw(), expected, "double (-inf)"); @@ -864,7 +865,7 @@ net::awaitable test_boolean() resp3::response::storage_type expected { {1UL, 0UL, resp3::type::boolean, {"f"}} }; - co_await async_read_one(ts, buf, *adapter); + co_await detail::async_read_one(ts, buf, *adapter); check_equal(resp.raw(), expected, "bool (false)"); } @@ -876,7 +877,7 @@ net::awaitable test_boolean() resp3::response::storage_type expected { {1UL, 0UL, resp3::type::boolean, {"t"}} }; - co_await async_read_one(ts, buf, *adapter); + co_await detail::async_read_one(ts, buf, *adapter); check_equal(resp.raw(), expected, "bool (true)"); } } @@ -892,7 +893,7 @@ net::awaitable test_blob_error() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_error, {"SYNTAX invalid syntax"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "blob_error (message)"); } @@ -904,7 +905,7 @@ net::awaitable test_blob_error() resp3::response::storage_type expected { {1UL, 0UL, resp3::type::blob_error, {}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "blob_error (empty message)"); } } @@ -920,7 +921,7 @@ net::awaitable test_verbatim_string() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::verbatim_string, {"txt:Some string"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "verbatim_string"); } @@ -929,7 +930,7 @@ net::awaitable test_verbatim_string() test_tcp_socket ts {cmd}; array_buffer.clear(); radapter.clear(); - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::verbatim_string, {}} }; check_equal(array_buffer, expected, "verbatim_string (empty)"); @@ -954,7 +955,7 @@ net::awaitable test_set2() , { 1UL, 1UL, resp3::type::simple_string, {"three"}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "test set (1)"); } @@ -967,7 +968,7 @@ net::awaitable test_set2() { { 0UL, 0UL, resp3::type::set, {}} }; - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "test set (2)"); } } @@ -981,7 +982,7 @@ net::awaitable test_map() test_tcp_socket ts {cmd}; array_buffer.clear(); radapter.clear(); - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); resp3::response::storage_type expected { {14UL, 0UL, resp3::type::map, {}} @@ -1008,7 +1009,7 @@ net::awaitable test_map() test_tcp_socket ts {cmd}; array_buffer.clear(); radapter.clear(); - co_await async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); resp3::response::storage_type expected { {0UL, 0UL, resp3::type::map, {}} }; check_equal(array_buffer, expected, "test map (empty)"); @@ -1026,7 +1027,7 @@ net::awaitable test_streamed_string() radapter.clear(); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::streamed_string_part, {"Hello world"}} }; - co_await resp3::async_read_one(ts, buf, radapter); + co_await detail::async_read_one(ts, buf, radapter); check_equal(array_buffer, expected, "streamed string"); } @@ -1035,7 +1036,7 @@ net::awaitable test_streamed_string() test_tcp_socket ts {cmd}; resp3::response resp; auto* adapter = resp.select_adapter(resp3::type::streamed_string_part, command::unknown, {}); - co_await resp3::async_read_one(ts, buf, *adapter); + co_await detail::async_read_one(ts, buf, *adapter); resp3::response::storage_type expected { {1UL, 0UL, resp3::type::streamed_string_part, {}} }; @@ -1050,7 +1051,7 @@ net::awaitable offline() // std::string cmd {"|1\r\n+key-popularity\r\n%2\r\n$1\r\na\r\n,0.1923\r\n$1\r\nb\r\n,0.0012\r\n"}; // test_tcp_socket ts {cmd}; // resp3::flat_radapter res; - // co_await async_read_one(ts, buf, res); + // co_await detail::async_read_one(ts, buf, res); // check_equal(res.result, {"key-popularity", "a", "0.1923", "b", "0.0012"}, "attribute"); //} @@ -1058,7 +1059,7 @@ net::awaitable offline() // std::string cmd {">4\r\n+pubsub\r\n+message\r\n+foo\r\n+bar\r\n"}; // test_tcp_socket ts {cmd}; // resp3::flat_radapter res; - // co_await async_read_one(ts, buf, res); + // co_await detail::async_read_one(ts, buf, res); // check_equal(res.result, {"pubsub", "message", "foo", "bar"}, "push type"); //} @@ -1066,7 +1067,7 @@ net::awaitable offline() // std::string cmd {">0\r\n"}; // test_tcp_socket ts {cmd}; // resp3::flat_radapter res; - // co_await async_read_one(ts, buf, res); + // co_await detail::async_read_one(ts, buf, res); // check_equal(res.result, {}, "push type (empty)"); //} }