From 187c1e6a0eb7fcc388f3bf09aee19bfd21782b86 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 30 Oct 2021 20:55:19 +0200 Subject: [PATCH] More refactoring. --- README.md | 15 +-- examples/ping.cpp | 8 +- examples/pubsub.cpp | 16 +-- include/aedis/aedis.hpp | 2 +- include/aedis/command.hpp | 12 ++- include/aedis/impl/command.ipp | 4 +- include/aedis/resp3/detail/composer.hpp | 98 ++++++++----------- include/aedis/resp3/detail/impl/composer.ipp | 10 ++ include/aedis/resp3/detail/read.hpp | 8 +- include/aedis/resp3/detail/write.hpp | 4 +- include/aedis/resp3/impl/request.ipp | 8 -- include/aedis/resp3/request.hpp | 83 ++++++++-------- include/aedis/resp3/response.hpp | 4 +- .../resp3/{connection.hpp => stream.hpp} | 2 +- tests/general.cpp | 27 ++--- 15 files changed, 142 insertions(+), 159 deletions(-) rename include/aedis/resp3/{connection.hpp => stream.hpp} (98%) diff --git a/README.md b/README.md index 1c7b0c0e..d6b7bce3 100644 --- a/README.md +++ b/README.md @@ -23,15 +23,18 @@ net::awaitable ping() std::queue requests; requests.push({}); - requests.back().hello(); - requests.back().ping(); - requests.back().quit(); + requests.back().push(command::hello, 3); + requests.back().push(command::ping); + requests.back().push(command::quit); - resp3::consumer cs; + resp3::stream s; for (;;) { resp3::response resp; - co_await cs.async_consume(socket, requests, resp); - std::cout << requests.front().elements.front() << "\n" << resp << std::endl; + co_await s.async_consume(socket, requests, resp); + + std::cout + << requests.front().commands.front() << "\n" + << resp << std::endl; } } ``` diff --git a/examples/ping.cpp b/examples/ping.cpp index aa636a79..184136b0 100644 --- a/examples/ping.cpp +++ b/examples/ping.cpp @@ -30,17 +30,17 @@ net::awaitable ping() std::queue requests; requests.push({}); - requests.back().push(command::hello, "3"); + requests.back().push(command::hello, 3); requests.back().push(command::ping); requests.back().push(command::quit); - resp3::connection conn; + resp3::stream s; for (;;) { resp3::response resp; - co_await conn.async_consume(socket, requests, resp); + co_await s.async_consume(socket, requests, resp); std::cout - << requests.front().elements.front() << "\n" + << requests.front().commands.front() << "\n" << resp << std::endl; } } diff --git a/examples/pubsub.cpp b/examples/pubsub.cpp index 77319f7e..000f6a5a 100644 --- a/examples/pubsub.cpp +++ b/examples/pubsub.cpp @@ -21,14 +21,14 @@ net::awaitable publisher() std::queue requests; requests.push({}); - requests.back().push(command::hello, "3"); + requests.back().push(command::hello, 3); - resp3::connection conn; + resp3::stream s; for (;;) { resp3::response resp; - co_await conn.async_consume(socket, requests, resp); + co_await s.async_consume(socket, requests, resp); - if (requests.front().elements.front().cmd == command::hello) { + if (requests.front().commands.front() == command::hello) { prepare_next(requests); requests.back().push(command::publish, "channel1", "Message to channel1"); requests.back().push(command::publish, "channel2", "Message to channel2"); @@ -48,20 +48,20 @@ net::awaitable subscriber() requests.push({}); requests.back().push(command::hello, "3"); - resp3::connection conn; + resp3::stream s; for (;;) { resp3::response resp; - co_await conn.async_consume(socket, requests, resp); + co_await s.async_consume(socket, requests, resp); if (resp.get_type() == resp3::type::push) { std::cout << "Subscriber " << id << ":\n" << resp << std::endl; continue; } - if (requests.front().elements.front().cmd == command::hello) { + if (requests.front().commands.front() == command::hello) { id = resp.raw().at(8).data; prepare_next(requests); - requests.back().subscribe({"channel1", "channel2"}); + requests.back().push(command::subscribe, "channel1", "channel2"); } } } diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index 11610ded..df79f48d 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -9,5 +9,5 @@ #include #include -#include +#include #include diff --git a/include/aedis/command.hpp b/include/aedis/command.hpp index c0ef9ab5..cb6a1347 100644 --- a/include/aedis/command.hpp +++ b/include/aedis/command.hpp @@ -39,6 +39,7 @@ enum class command , client_id /// https://redis.io/commands/del , del + /// https://redis.io/commands/exec , exec /// https://redis.io/commands/expire , expire @@ -66,6 +67,7 @@ enum class command , hvals /// https://redis.io/commands/hdel , hdel + /// https://redis.io/commands/incr , incr /// https://redis.io/commands/keys , keys @@ -79,11 +81,15 @@ enum class command , lrange /// https://redis.io/commands/ltrim , ltrim + /// https://redis.io/commands/multi , multi + /// https://redis.io/commands/ping , ping + /// https://redis.io/commands/psubscribe , psubscribe /// https://redis.io/commands/publish , publish + /// https://redis.io/commands/quit , quit /// https://redis.io/commands/role , role @@ -93,6 +99,7 @@ enum class command , sadd /// https://redis.io/commands/scard , scard + /// https://redis.io/commands/sdiff , sdiff /// https://redis.io/commands/sentinel , sentinel @@ -100,21 +107,24 @@ enum class command , set /// https://redis.io/commands/smembers , smembers + /// https://redis.io/commands/subscribe , subscribe /// https://redis.io/commands/unsubscribe , unsubscribe /// https://redis.io/commands/zadd , zadd + /// https://redis.io/commands/zrange , zrange /// https://redis.io/commands/zrangebyscore , zrangebyscore /// https://redis.io/commands/zremrangebyscore , zremrangebyscore + /// Invalid or unknown command. , unknown }; /// Converts the command to a string. -char const* as_string(command c); +char const* to_string(command c); /** Writes the text representation of the command to the output * stream. diff --git a/include/aedis/impl/command.ipp b/include/aedis/impl/command.ipp index c177bdc0..5d866c6b 100644 --- a/include/aedis/impl/command.ipp +++ b/include/aedis/impl/command.ipp @@ -11,7 +11,7 @@ namespace aedis { -char const* as_string(command c) +char const* to_string(command c) { static char const* table[] = { "ACL LOAD" @@ -81,7 +81,7 @@ char const* as_string(command c) std::ostream& operator<<(std::ostream& os, command c) { - os << as_string(c); + os << to_string(c); return os; } diff --git a/include/aedis/resp3/detail/composer.hpp b/include/aedis/resp3/detail/composer.hpp index bba3fad4..31690124 100644 --- a/include/aedis/resp3/detail/composer.hpp +++ b/include/aedis/resp3/detail/composer.hpp @@ -7,29 +7,57 @@ #pragma once -#include -#include #include -#include -#include -#include -#include #include #include -#include namespace aedis { namespace resp3 { namespace detail { +template +struct needs_to_string { + static constexpr auto value = true; +}; + +template <> +struct needs_to_string { + static constexpr auto value = false; +}; + +template <> +struct needs_to_string { + static constexpr auto value = false; +}; + +template <> +struct needs_to_string { + static constexpr auto value = false; +}; + +template <> +struct needs_to_string { + static constexpr auto value = false; +}; + +template +struct needs_to_string { + static constexpr auto value = false; +}; + +template +struct needs_to_string { + static constexpr auto value = false; +}; + void add_header(std::string& to, int size); void add_bulk(std::string& to, std::string_view param); -// Overlaod for integer or floating point data types. template -void add_bulk(std::string& to, T data, typename std::enable_if<(std::is_integral::value || std::is_floating_point::value), bool>::type = false) +void add_bulk(std::string& to, T const& data, typename std::enable_if::value, bool>::type = false) { - auto const s = std::to_string(data); + using std::to_string; + auto const s = to_string(data); add_bulk(to, s); } @@ -51,55 +79,7 @@ struct value_type_size> { static constexpr auto size = 2U; }; -template -void assemble( std::string& to - , std::string_view cmd - , Iter1 begin1 - , Iter1 end1 - , Iter2 begin2 - , Iter2 end2) -{ - using value_type1 = typename std::iterator_traits::value_type; - using value_type2 = typename std::iterator_traits::value_type; - - auto const f1 = value_type_size::size; - auto const f2 = value_type_size::size; - - auto const d1 = std::distance(begin1, end1); - auto const d2 = std::distance(begin2, end2); - - add_header(to, 1 + f1 * d1 + f2 * d2); - add_bulk(to, cmd); - - for (; begin1 != end1; ++begin1) - add_bulk(to, *begin1); - - for (; begin2 != end2; ++begin2) - add_bulk(to, *begin2); -} - -template -void assemble(std::string& to, std::string_view cmd, Range const& range) -{ - using std::cbegin; - using std::cend; - std::initializer_list dummy = {}; - detail::assemble(to, cmd, cbegin(range), cend(range), cbegin(dummy), cend(dummy)); -} - -template -void assemble(std::string& to, std::string_view cmd, Range1 const& range1, Range2 const& range2) -{ - using std::cbegin; - using std::cend; - detail::assemble(to, cmd, cbegin(range1), cend(range1), cbegin(range2), cend(range2)); -} - -template -constexpr decltype(auto) front(Tp&& t, Us&&...) noexcept -{ - return std::forward(t); -} +bool has_push_response(command cmd); } // detail } // resp3 diff --git a/include/aedis/resp3/detail/impl/composer.ipp b/include/aedis/resp3/detail/impl/composer.ipp index 611a992f..4f27c314 100644 --- a/include/aedis/resp3/detail/impl/composer.ipp +++ b/include/aedis/resp3/detail/impl/composer.ipp @@ -27,6 +27,16 @@ void add_bulk(std::string& to, std::string_view data) to += "\r\n"; } +bool has_push_response(command cmd) +{ + switch (cmd) { + case command::subscribe: + case command::unsubscribe: + case command::psubscribe: return true; + default: return false; + } +}; + } // detail } // resp3 } // aedis diff --git a/include/aedis/resp3/detail/read.hpp b/include/aedis/resp3/detail/read.hpp index d8136d42..5421c39e 100644 --- a/include/aedis/resp3/detail/read.hpp +++ b/include/aedis/resp3/detail/read.hpp @@ -192,8 +192,8 @@ struct consumer_op { auto* adapter = resp.select_adapter(m_type); async_read_one(stream, buffer, *adapter, std::move(self)); } else { - auto const& elem = requests.front().elements.front(); - auto* adapter = resp.select_adapter(m_type, elem); + auto const cmd = requests.front().commands.front(); + auto* adapter = resp.select_adapter(m_type, cmd); async_read_one(stream, buffer, *adapter, std::move(self)); } } @@ -206,9 +206,9 @@ struct consumer_op { yield self.complete(ec, m_type); if (m_type != type::push) - requests.front().elements.pop(); + requests.front().commands.pop(); - } while (!std::empty(requests) && !std::empty(requests.front().elements)); + } while (!std::empty(requests) && !std::empty(requests.front().commands)); if (!std::empty(requests)) requests.pop(); } while (std::empty(requests)); diff --git a/include/aedis/resp3/detail/write.hpp b/include/aedis/resp3/detail/write.hpp index 1a1ffe13..5f3d7d0b 100644 --- a/include/aedis/resp3/detail/write.hpp +++ b/include/aedis/resp3/detail/write.hpp @@ -76,13 +76,13 @@ struct write_some_op { if (ec) break; - if (std::empty(requests.front().elements)) { + if (std::empty(requests.front().commands)) { // We only pop when all commands in the pipeline have push // responses like subscribe, otherwise, pop is done when the // response arrives. requests.pop(); } - } while (!std::empty(requests) && std::empty(requests.front().elements)); + } while (!std::empty(requests) && std::empty(requests.front().commands)); self.complete(ec); } diff --git a/include/aedis/resp3/impl/request.ipp b/include/aedis/resp3/impl/request.ipp index a915ecff..3e32bb07 100644 --- a/include/aedis/resp3/impl/request.ipp +++ b/include/aedis/resp3/impl/request.ipp @@ -19,13 +19,5 @@ bool prepare_next(std::queue& reqs) return cond; } -std::ostream& operator<<(std::ostream& os, request::element const& e) -{ - os << e.cmd; - if (!std::empty(e.key)) - os << "(" << e.key << ")"; - return os; -} - } // resp3 } // aedis diff --git a/include/aedis/resp3/request.hpp b/include/aedis/resp3/request.hpp index c500ec8d..c7f5cd4b 100644 --- a/include/aedis/resp3/request.hpp +++ b/include/aedis/resp3/request.hpp @@ -20,7 +20,7 @@ namespace aedis { namespace resp3 { -/** A Redis request also referred to as a pipeline. +/** A Redis request (also referred to as a pipeline). * * A request is composed of one or more redis commands and is * refered to in the redis documentation as a pipeline, see @@ -31,19 +31,17 @@ namespace resp3 { */ class request { public: - struct element { - command cmd; - std::string key; - }; std::string payload; - std::queue elements; + std::queue commands; public: - /// Return the size of the pipeline. i.e. how many commands it - /// contains. + /** Returns the size of the command pipeline. i.e. how many commands it contains. + */ auto size() const noexcept - { return std::size(elements); } + { return std::size(commands); } + /** Return true if the request contains no commands. + */ bool empty() const noexcept { return std::empty(payload); }; @@ -51,9 +49,11 @@ public: void clear() { payload.clear(); - elements = {}; + commands = {}; } + /** Appends a new command to the request. + */ template void push(command cmd, Ts const&... args) { @@ -63,53 +63,56 @@ public: auto constexpr pack_size = sizeof...(Ts); detail::add_header(payload, 1 + pack_size); - // TODO: as_string is not a good idea, better to_string. - detail::add_bulk(payload, as_string(cmd)); + detail::add_bulk(payload, to_string(cmd)); (detail::add_bulk(payload, args), ...); - // TODO: Do not assume the front is convertible to a string. - // TODO: Is it correct to use the front as the key. - std::string_view key; - if constexpr (pack_size != 0) - key = detail::front(args...); - - elements.emplace(cmd, std::string{key}); + if (!detail::has_push_response(cmd)) + commands.emplace(cmd); } - template - void push_range(command cmd, std::string_view key, ForwardIterator begin, ForwardIterator end) + /** Appends a new command to the request. + */ + template + void push_range(command cmd, Key const& key, ForwardIterator begin, ForwardIterator end) { - // TODO: For hset find a way to assert the value type is a pair. + // Note: For some commands like hset it would be a good idea to assert + // the value type is a pair. + using value_type = typename std::iterator_traits::value_type; auto constexpr size = detail::value_type_size::size; auto const distance = std::distance(begin, end); detail::add_header(payload, 2 + size * distance); - detail::add_bulk(payload, as_string(cmd)); + detail::add_bulk(payload, to_string(cmd)); detail::add_bulk(payload, key); for (; begin != end; ++begin) detail::add_bulk(payload, *begin); - elements.emplace(cmd, std::string{key}); + if (!detail::has_push_response(cmd)) + commands.emplace(cmd); } - /// Adds subscribe to the request, see https://redis.io/commands/subscribe - void subscribe(std::initializer_list keys) + /** Appends a new command to the request. + */ + template + void push_range(command cmd, ForwardIterator begin, ForwardIterator end) { - // The response to this command is a push type. - detail::assemble(payload, "SUBSCRIBE", keys); - } + // Note: For some commands like hset it would be a good idea to assert + // the value type is a pair. - void psubscribe(std::initializer_list keys) - { - detail::assemble(payload, "PSUBSCRIBE", keys); - } - - void unsubscribe(std::string_view key) - { - // The response to this command is a push type. - detail::assemble(payload, "UNSUBSCRIBE", key); + using value_type = typename std::iterator_traits::value_type; + + auto constexpr size = detail::value_type_size::size; + auto const distance = std::distance(begin, end); + detail::add_header(payload, 1 + size * distance); + detail::add_bulk(payload, to_string(cmd)); + + for (; begin != end; ++begin) + detail::add_bulk(payload, *begin); + + if (!detail::has_push_response(cmd)) + commands.emplace(cmd); } }; @@ -118,9 +121,5 @@ public: */ bool prepare_next(std::queue& reqs); -/** Writes the request element as a string to the stream. - */ -std::ostream& operator<<(std::ostream& os, request::element const& r); - } // resp3 } // aedis diff --git a/include/aedis/resp3/response.hpp b/include/aedis/resp3/response.hpp index 7db98498..fdefb6db 100644 --- a/include/aedis/resp3/response.hpp +++ b/include/aedis/resp3/response.hpp @@ -85,9 +85,7 @@ public: * your own response types. */ virtual adapter* - select_adapter( - resp3::type t, - request::element const& e = {command::unknown, std::string{}}) + select_adapter(resp3::type t, command cmd = command::unknown) { return &adapter_; } auto const& raw() const noexcept {return data_;} diff --git a/include/aedis/resp3/connection.hpp b/include/aedis/resp3/stream.hpp similarity index 98% rename from include/aedis/resp3/connection.hpp rename to include/aedis/resp3/stream.hpp index a010db76..5e9a688c 100644 --- a/include/aedis/resp3/connection.hpp +++ b/include/aedis/resp3/stream.hpp @@ -19,7 +19,7 @@ namespace resp3 { /** Reads and writes redis commands. */ -struct connection { +struct stream { std::string buffer; net::coroutine coro = net::coroutine(); type t = type::invalid; diff --git a/tests/general.cpp b/tests/general.cpp index fcce0f87..af11b657 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -48,15 +48,6 @@ void check_equal(T const& a, T const& b, std::string const& msg = "") std::cout << "Error: " << msg << std::endl; } -template -void check_equal_number(T const& a, T const& b, std::string const& msg = "") -{ - if (a == b) - std::cout << "Success: " << msg << std::endl; - else - std::cout << "Error: " << a << " != " << b << " " << msg << std::endl; -} - //------------------------------------------------------------------- struct test_general_fill { @@ -76,7 +67,7 @@ struct test_general_fill { p.push(command::get, "b"); p.push(command::append, "b", "b"); p.push(command::del, "b"); - p.subscribe({"channel"}); + p.push(command::subscribe, "channel"); p.push(command::publish, "channel", "message"); p.push(command::incr, "3"); @@ -126,17 +117,17 @@ test_general(net::ip::tcp::resolver::results_type const& res) std::queue requests; requests.push({}); - requests.back().push(command::hello, "3"); + requests.back().push(command::hello, 3); test_general_fill filler; resp3::response resp; - resp3::connection cs; + resp3::stream s; int push_counter = 0; for (;;) { resp.clear(); - co_await cs.async_consume(socket, requests, resp, net::use_awaitable); + co_await s.async_consume(socket, requests, resp, net::use_awaitable); if (resp.get_type() == resp3::type::push) { switch (push_counter) { @@ -168,9 +159,9 @@ test_general(net::ip::tcp::resolver::results_type const& res) continue; } - auto const& elem = requests.front().elements.front(); + auto const cmd = requests.front().commands.front(); - switch (elem.cmd) { + switch (cmd) { case command::hello: { prepare_next(requests); @@ -432,7 +423,7 @@ test_general(net::ip::tcp::resolver::results_type const& res) }; check_equal(resp.raw(), expected, "smembers (value)"); } break; - default: { std::cout << "Error: " << resp.get_type() << " " << elem.cmd << std::endl; } + default: { std::cout << "Error: " << resp.get_type() << " " << cmd << std::endl; } } resp.raw().clear(); @@ -447,7 +438,7 @@ test_list(net::ip::tcp::resolver::results_type const& results) std::vector list {1 ,2, 3, 4, 5, 6}; resp3::request p; - p.push(command::hello, "3"); + p.push(command::hello, 3); p.push(command::flushall); p.push_range(command::rpush, "a", std::cbegin(list), std::cend(list)); p.push(command::lrange, "a", 0, -1); @@ -548,7 +539,7 @@ test_set(net::ip::tcp::resolver::results_type const& results) co_await async_connect(socket, results); resp3::request p; - p.push(command::hello, "3"); + p.push(command::hello, 3); p.push(command::flushall); p.push(command::set, "s", test_bulk1); p.push(command::get, "s");