From 2ed19ee124a3260382766f5e0e4bef0841b5381f Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 28 Aug 2021 09:56:16 +0200 Subject: [PATCH] Refactoring. --- examples/async_basic.cpp | 5 +- include/aedis/aedis.hpp | 1 - include/aedis/connection.hpp | 154 ------ include/aedis/detail/response_adapters.hpp | 27 +- include/aedis/impl/connection.ipp | 38 -- include/aedis/impl/read.ipp | 55 -- include/aedis/impl/src.hpp | 1 - include/aedis/impl/type.ipp | 1 + include/aedis/read.hpp | 36 +- include/aedis/type.hpp | 2 +- tests/general.cpp | 595 ++++++++------------- 11 files changed, 259 insertions(+), 656 deletions(-) delete mode 100644 include/aedis/connection.hpp delete mode 100644 include/aedis/impl/connection.ipp diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 27e88000..1e0a93cf 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -27,7 +27,6 @@ void print_helper(command cmd, resp3::type type, response_buffers& bufs) net::awaitable reader(net::ip::tcp::socket& socket, std::queue& reqs) { - response_buffers bufs; std::string buffer; prepare_queue(reqs); @@ -35,9 +34,9 @@ reader(net::ip::tcp::socket& socket, std::queue& reqs) co_await async_write(socket, net::buffer(reqs.back().payload), net::use_awaitable); - detail::response_adapters adapters{bufs}; + response_buffers bufs; for (;;) { - auto const event = co_await async_consume(socket, buffer, adapters, reqs); + auto const event = co_await async_read(socket, buffer, bufs, reqs); switch (event.first) { case command::hello: diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index b4d203bc..be0526da 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -9,7 +9,6 @@ #include #include -#include #include #include #include diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp deleted file mode 100644 index 0b791de6..00000000 --- a/include/aedis/connection.hpp +++ /dev/null @@ -1,154 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#pragma once - -#include -#include - -#include - -#include "net.hpp" -#include "read.hpp" -#include "write.hpp" -#include "type.hpp" -#include "pipeline.hpp" - -namespace aedis { - -/** A class that keeps a connection to the redis server. -*/ -class connection : public std::enable_shared_from_this { -public: - /** Redis server configuration. - */ - struct config { - /// Redis host. - std::string host; - - /// Redis port. - std::string port; - - /** The maximum pipeline size. Once this size is exceeded a new - * pipeline is added to the queue. - */ - int max_pipeline_size = 1000; - - /** The maximum pipeline payload size. Once this size is - * exceeded a new pipeline is added to the queue. - */ - int max_payload_size = 10000; - }; - -private: - net::ip::tcp::socket socket_; - std::string buffer_; - std::queue reqs_; - config conf_; - - template - net::awaitable worker_coro(Receiver receiver, response_buffers& bufs) - { - try { - auto ex = co_await net::this_coro::executor; - - net::ip::tcp::resolver resolver{ex}; - auto const res = resolver.resolve(conf_.host, conf_.port); - - co_await async_connect(socket_, res, net::use_awaitable); - - pipeline p; - p.hello("3"); - reqs_.push(p); - co_await async_write(socket_, net::buffer(p.payload), net::use_awaitable); - - detail::response_adapters adapters{bufs}; - for (;;) { - auto const event = co_await async_consume(socket_, buffer_, adapters, reqs_); - receiver(event.first, event.second); - } - } catch (...) { - } - } - -public: - /// Contructs a connection. - connection( - net::any_io_executor const& ioc, - config const& conf = config {"127.0.0.1", "6379", 1000, 10000}); - - /// Stablishes the connection with the redis server. - void start(receiver_base& recv, response_buffers& bufs); - - template - void run(Receiver receiver, response_buffers& bufs) - { - auto self = this->shared_from_this(); - - auto f = [self, receiver, &bufs] () mutable - { return self->worker_coro(receiver, bufs); }; - - net::co_spawn(socket_.get_executor(), f, net::detached); - } - - /** Adds commands to the ouput queue. The Filler signature must be - * - * void f(request& req) - */ - template - bool send(Filler filler) - { - auto const empty = std::empty(reqs_); - if (empty || std::size(reqs_) == 1) - reqs_.push({}); - - auto const pipeline_size = std::ssize(reqs_.back()); - auto const payload_size = std::ssize(reqs_.back().payload); - if (pipeline_size > conf_.max_pipeline_size || payload_size > conf_.max_payload_size) - reqs_.push({}); - - filler(reqs_.back()); - - if (empty) { - co_spawn( - socket_.get_executor(), - async_write_all(socket_, reqs_), - net::detached); - } - - return empty; - } - - auto queue_size() const noexcept - { return std::ssize(reqs_); } - - /// Adds ping to the request, see https://redis.io/commands/bgrewriteaof - void ping() - { send([](auto& req){ req.ping();}); } - - /// Adds ping to the request, see https://redis.io/commands/psubscribe - void psubscribe(std::initializer_list l) - { send([&](auto& req){ req.psubscribe(l);}); } - - /// Adds quit to the request, see https://redis.io/commands/quit - void quit() - { send([](auto& req){ req.quit();}); } - - /// Adds multi to the request, see https://redis.io/commands/multi - void multi() - { send([](auto& req){ req.multi();}); } - - /// Adds multi to the request, see https://redis.io/commands/exec - void exec() - { send([](auto& req){ req.exec();}); } - - /// Adds incr to the request, see https://redis.io/commands/incr - void incr(std::string_view key) - { send([&](auto& req){ req.incr(key);}); } -}; - -} // aedis diff --git a/include/aedis/detail/response_adapters.hpp b/include/aedis/detail/response_adapters.hpp index 928108d7..943acec7 100644 --- a/include/aedis/detail/response_adapters.hpp +++ b/include/aedis/detail/response_adapters.hpp @@ -150,7 +150,7 @@ struct response_simple_string : public response_adapter_base { response_simple_string(resp3::simple_string* p) : result(p) {} void on_simple_string(std::string_view s) override - { from_string_view(s, *result); } + { *result = s; } }; struct response_simple_error : public response_adapter_base { @@ -159,7 +159,7 @@ struct response_simple_error : public response_adapter_base { response_simple_error(resp3::simple_error* p) : result(p) {} void on_simple_error(std::string_view s) override - { from_string_view(s, *result); } + { *result = s; } }; struct response_big_number : public response_adapter_base { @@ -231,15 +231,27 @@ struct response_unordered_set : response_adapter_base { template struct response_basic_array : response_adapter_base { + int i = 0; resp3::basic_array* result = nullptr; response_basic_array(resp3::basic_array* p) : result(p) {} void add(std::string_view s = {}) { - T r; - from_string_view(s, r); - result->push_back(std::move(r)); + from_string_view(s, result->at(i)); + ++i; + } + + void select_array(int n) override + { + i = 0; + result->resize(n); + } + + void select_push(int n) override + { + i = 0; + result->resize(n); } // TODO: Call vector reserve. @@ -250,14 +262,13 @@ struct response_basic_array : response_adapter_base { void on_big_number(std::string_view s) override { add(s); } void on_verbatim_string(std::string_view s = {}) override { add(s); } void on_blob_string(std::string_view s = {}) override { add(s); } - void select_array(int n) override { } void select_set(int n) override { } void select_map(int n) override { } - void select_push(int n) override { } void on_streamed_string_part(std::string_view s = {}) override { add(s); } }; using response_array = response_basic_array; +using response_push = response_basic_array; struct response_map : response_adapter_base { resp3::map* result = nullptr; @@ -352,7 +363,7 @@ struct response_basic_static_map : response_adapter_base { struct response_adapters { response_tree resp_transaction; response_array resp_array; - response_array resp_push; + response_push resp_push; response_set resp_set; response_map resp_map; response_array resp_attribute; diff --git a/include/aedis/impl/connection.ipp b/include/aedis/impl/connection.ipp deleted file mode 100644 index 2ddf7241..00000000 --- a/include/aedis/impl/connection.ipp +++ /dev/null @@ -1,38 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include -#include -#include - -namespace aedis { - -connection::connection(net::any_io_executor const& ioc, config const& conf) -: socket_{ioc} -, conf_{conf} -{ -} - -void connection::start(receiver_base& recv, response_buffers& bufs) -{ - auto self = this->shared_from_this(); - - auto receiver = [&](auto cmd, auto type) - { - switch (type) { - case resp3::type::push: recv.on_push(); break; - default: forward(cmd, type, recv); - } - }; - - auto f = [self, receiver, &bufs] () mutable - { return self->worker_coro(receiver, bufs); }; - - net::co_spawn(socket_.get_executor(), f, net::detached); -} - -} // aedis diff --git a/include/aedis/impl/read.ipp b/include/aedis/impl/read.ipp index 9453d056..15d62a89 100644 --- a/include/aedis/impl/read.ipp +++ b/include/aedis/impl/read.ipp @@ -7,63 +7,8 @@ #include -#define EXPAND_RECEIVER_CASE(type, cmd) case command::cmd: recv.on_##cmd(type); break - namespace aedis { -void -forward( - command cmd, - resp3::type type, - receiver_base& recv) -{ - switch (cmd) { - EXPAND_RECEIVER_CASE(type, acl_cat); - EXPAND_RECEIVER_CASE(type, acl_deluser); - EXPAND_RECEIVER_CASE(type, acl_genpass); - EXPAND_RECEIVER_CASE(type, acl_getuser); - EXPAND_RECEIVER_CASE(type, acl_help); - EXPAND_RECEIVER_CASE(type, acl_list); - EXPAND_RECEIVER_CASE(type, acl_load); - EXPAND_RECEIVER_CASE(type, acl_log); - EXPAND_RECEIVER_CASE(type, acl_save); - EXPAND_RECEIVER_CASE(type, acl_setuser); - EXPAND_RECEIVER_CASE(type, acl_users); - EXPAND_RECEIVER_CASE(type, acl_whoami); - EXPAND_RECEIVER_CASE(type, append); - EXPAND_RECEIVER_CASE(type, del); - EXPAND_RECEIVER_CASE(type, exec); - EXPAND_RECEIVER_CASE(type, expire); - EXPAND_RECEIVER_CASE(type, flushall); - EXPAND_RECEIVER_CASE(type, get); - EXPAND_RECEIVER_CASE(type, hdel); - EXPAND_RECEIVER_CASE(type, hello); - EXPAND_RECEIVER_CASE(type, hget); - EXPAND_RECEIVER_CASE(type, hgetall); - EXPAND_RECEIVER_CASE(type, hincrby); - EXPAND_RECEIVER_CASE(type, hset); - EXPAND_RECEIVER_CASE(type, hvals); - EXPAND_RECEIVER_CASE(type, incr); - EXPAND_RECEIVER_CASE(type, llen); - EXPAND_RECEIVER_CASE(type, lpop); - EXPAND_RECEIVER_CASE(type, lrange); - EXPAND_RECEIVER_CASE(type, ltrim); - EXPAND_RECEIVER_CASE(type, multi); - EXPAND_RECEIVER_CASE(type, ping); - EXPAND_RECEIVER_CASE(type, publish); - EXPAND_RECEIVER_CASE(type, quit); - EXPAND_RECEIVER_CASE(type, rpush); - EXPAND_RECEIVER_CASE(type, sadd); - EXPAND_RECEIVER_CASE(type, set); - EXPAND_RECEIVER_CASE(type, smembers); - EXPAND_RECEIVER_CASE(type, zadd); - EXPAND_RECEIVER_CASE(type, zrange); - EXPAND_RECEIVER_CASE(type, zrangebyscore); - EXPAND_RECEIVER_CASE(type, zremrangebyscore); - default: {assert(false);} - } -} - response_adapter_base* select_buffer(detail::response_adapters& adapters, resp3::type type, command cmd) { if (type == resp3::type::push) diff --git a/include/aedis/impl/src.hpp b/include/aedis/impl/src.hpp index ddd222a4..638bce87 100644 --- a/include/aedis/impl/src.hpp +++ b/include/aedis/impl/src.hpp @@ -6,7 +6,6 @@ */ #include -#include #include #include diff --git a/include/aedis/impl/type.ipp b/include/aedis/impl/type.ipp index ab5f7ca6..f01a3263 100644 --- a/include/aedis/impl/type.ipp +++ b/include/aedis/impl/type.ipp @@ -33,6 +33,7 @@ std::string to_string(type t) EXPAND_TYPE_CASE(blob_string); EXPAND_TYPE_CASE(streamed_string_part); EXPAND_TYPE_CASE(invalid); + EXPAND_TYPE_CASE(transaction); default: assert(false); } } diff --git a/include/aedis/read.hpp b/include/aedis/read.hpp index 88cd1ebd..98dbb22a 100644 --- a/include/aedis/read.hpp +++ b/include/aedis/read.hpp @@ -7,21 +7,6 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - #include #include #include @@ -35,11 +20,6 @@ namespace aedis { response_adapter_base* select_buffer(detail::response_adapters& buffers, resp3::type t, command cmd); -void forward( - command cmd, - resp3::type type, - receiver_base& recv); - // The parser supports up to 5 levels of nested structures. The first // element in the sizes stack is a sentinel and must be different from // 1. @@ -169,7 +149,7 @@ template < class CompletionToken = net::default_completion_token_t > -auto async_read( +auto async_read_one( AsyncReadStream& stream, Storage& buffer, response_adapter_base& res, @@ -247,15 +227,12 @@ auto async_read_type( using transaction_queue_type = std::deque>; // TODO: Convert into a composed operation with async_compose. -template < - class AsyncReadWriteStream, - class Storage, - class ResponseBuffers> +template < class AsyncReadWriteStream, class Storage> net::awaitable> -async_consume( +async_read( AsyncReadWriteStream& socket, Storage& buffer, - ResponseBuffers& resps, + response_buffers& bufs, std::queue& reqs) { auto const type = co_await async_read_type(socket, buffer, net::use_awaitable); @@ -268,8 +245,9 @@ async_consume( cmd = reqs.front().cmds.front(); } - auto* buf_adapter = select_buffer(resps, type, cmd); - co_await async_read(socket, buffer, *buf_adapter, net::use_awaitable); + detail::response_adapters adapters{bufs}; + auto* buf_adapter = select_buffer(adapters, type, cmd); + co_await async_read_one(socket, buffer, *buf_adapter, net::use_awaitable); if (type != resp3::type::push) { reqs.front().cmds.pop(); diff --git a/include/aedis/type.hpp b/include/aedis/type.hpp index ef68aade..d4750ba3 100644 --- a/include/aedis/type.hpp +++ b/include/aedis/type.hpp @@ -32,7 +32,7 @@ enum class type , verbatim_string , blob_string , streamed_string_part -, transaction // Not from resp3. +, transaction // Not from resp3. TODO: Perhaps rename to array and array to flat_array? , invalid }; diff --git a/tests/general.cpp b/tests/general.cpp index 8a13d5ab..ae0335aa 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -41,6 +41,62 @@ void check_equal_number(T const& a, T const& b, std::string const& msg = "") //------------------------------------------------------------------- +struct test_general_fill { + std::vector list_ {1 ,2, 3, 4, 5, 6}; + std::string set_ {"aaa"}; + + void operator()(pipeline& p) const + { + p.flushall(); + p.rpush("a", list_); + p.llen("a"); + p.lrange("a"); + p.ltrim("a", 2, -2); + p.lpop("a"); + //p.lpop("a", 2); // Not working? + p.set("b", {set_}); + p.get("b"); + p.append("b", "b"); + p.del("b"); + p.subscribe("channel"); + p.publish("channel", "message"); + p.incr("c"); + + //---------------------------------- + // transaction + for (auto i = 0; i < 3; ++i) { + p.multi(); + p.ping(); + p.ping(); + // TODO: It looks like we can't publish to a channel we + // are already subscribed to from inside a transaction. + //req.publish("some-channel", "message1"); + p.exec(); + } + //---------------------------------- + + std::map m1 = + { {"field1", "value1"} + , {"field2", "value2"}}; + + p.hset("d", m1); + p.hget("d", "field2"); + p.hgetall("d"); + p.hdel("d", {"field1", "field2"}); + p.hincrby("e", "some-field", 10); + + p.zadd("f", 1, "Marcelo"); + p.zrange("f"); + p.zrangebyscore("f", 1, 1); + p.zremrangebyscore("f", "-inf", "+inf"); + + p.sadd("g", std::vector{1, 2, 3}); + p.smembers("g"); + + p.quit(); + } +}; + net::awaitable test_general(net::ip::tcp::resolver::results_type const& res) { @@ -50,340 +106,148 @@ test_general(net::ip::tcp::resolver::results_type const& res) co_await net::async_connect(socket, res, net::use_awaitable); std::queue reqs; - response_buffers bufs; std::string buffer; prepare_queue(reqs); reqs.back().hello("3"); - std::vector list_ {1 ,2, 3, 4, 5, 6}; - std::string set_ {"aaa"}; + test_general_fill filler; co_await async_write(socket, net::buffer(reqs.back().payload), net::use_awaitable); - detail::response_adapters adapters{bufs}; + int push_counter = 0; + response_buffers bufs; for (;;) { - auto const event = co_await async_consume(socket, buffer, adapters, reqs); + auto const event = co_await async_read(socket, buffer, bufs, reqs); - switch (event.first) { - case command::hello: + switch (event.second) { + case resp3::type::simple_string: { - bufs.map.clear(); - auto const empty = prepare_queue(reqs); - reqs.back().flushall(); - reqs.back().rpush("a", list_); - reqs.back().llen("a"); - reqs.back().lrange("a"); - reqs.back().ltrim("a", 2, -2); - reqs.back().lpop("a"); - //reqs.back().lpop("a", 2); // Not working? - reqs.back().set("b", {set_}); - reqs.back().get("b"); - reqs.back().append("b", "b"); - reqs.back().del("b"); - reqs.back().subscribe("channel"); - reqs.back().publish("channel", "message"); - reqs.back().incr("c"); - - std::map m1 = - { {"field1", "value1"} - , {"field2", "value2"}}; - - reqs.back().hset("d", m1); - reqs.back().hget("d", "field2"); - reqs.back().hgetall("d"); - reqs.back().hdel("d", {"field1", "field2"}); - reqs.back().hincrby("e", "some-field", 10); - - reqs.back().zadd("f", 1, "Marcelo"); - reqs.back().zrange("f"); - reqs.back().zrangebyscore("f", 1, 1); - reqs.back().zremrangebyscore("f", "-inf", "+inf"); - - reqs.back().sadd("g", std::vector{1, 2, 3}); - reqs.back().smembers("g"); - - reqs.back().quit(); - if (empty) - co_await async_write_all(socket, reqs); - } break; - case command::get: - check_equal(event.second, resp3::type::blob_string, "get (type)"); - check_equal(bufs.blob_string, set_, "get (value)"); - bufs.blob_string.clear(); - break; - case command::lrange: - check_equal(event.second, resp3::type::array, "lrange (type)"); - check_equal(bufs.array, {"1", "2", "3", "4", "5", "6"}, "lrange (value)"); - bufs.array.clear(); - break; - case command::hget: - check_equal(event.second, resp3::type::blob_string, "hget (type)"); - check_equal(bufs.blob_string, std::string{"value2"}, "hget (value)"); - bufs.blob_string.clear(); - break; - case command::hgetall: - check_equal(event.second, resp3::type::map, "hgetall (type)"); - check_equal(bufs.map, {"field1", "value1", "field2", "value2"}, "hgetall (value)"); - bufs.map.clear(); - break; - case command::hvals: - check_equal(event.second, resp3::type::array, "hvals (type)"); - check_equal(bufs.array, {"value1", "value2"}, "hvals (value)"); - bufs.array.clear(); - break; - case command::zrange: - check_equal(event.second, resp3::type::array, "hvals (type)"); - check_equal(bufs.array, {"Marcelo"}, "hvals (value)"); - bufs.array.clear(); - break; - case command::zrangebyscore: - check_equal(event.second, resp3::type::array, "zrangebyscore (type)"); - check_equal(bufs.array, {"Marcelo"}, "zrangebyscore (value)"); - bufs.array.clear(); - break; - case command::smembers: - check_equal(event.second, resp3::type::set, "smembers (type)"); - check_equal(bufs.set, {"1", "2", "3"}, "smembers (value)"); - bufs.set.clear(); - break; - case command::set: - check_equal(event.second, resp3::type::simple_string, "set (type)"); - check_equal(bufs.simple_string, {"OK"}, "set (value)"); - bufs.simple_string.clear(); - break; - case command::quit: - check_equal(event.second, resp3::type::simple_string, "quit (type)"); - check_equal(bufs.simple_string, {"OK"}, "quit (value)"); - bufs.simple_string.clear(); - break; - case command::flushall: - check_equal(event.second, resp3::type::simple_string, "flushall (type)"); - check_equal(bufs.simple_string, {"OK"}, "flushall (value)"); - bufs.simple_string.clear(); - break; - case command::ltrim: - check_equal(event.second, resp3::type::simple_string, "ltrim (type)"); - check_equal(bufs.simple_string, {"OK"}, "ltrim (value)"); - bufs.simple_string.clear(); - break; - case command::append: - check_equal(event.second, resp3::type::number, "append (type)"); - check_equal(bufs.number, 4LL, "append (value)"); - break; - case command::hset: - check_equal(event.second, resp3::type::number, "hset (type)"); - check_equal(bufs.number, 2LL, "hset (value)"); - break; - case command::rpush: - check_equal(event.second, resp3::type::number, "rpush (type)"); - check_equal(bufs.number, (resp3::number)std::size(list_), "rpush (value)"); - break; - case command::del: - check_equal(event.second, resp3::type::number, "del (type)"); - check_equal(bufs.number, 1LL, "del (value)"); - break; - case command::llen: - check_equal(event.second, resp3::type::number, "llen (type)"); - check_equal(bufs.number, 6LL, "llen (value)"); - break; - case command::incr: - check_equal(event.second, resp3::type::number, "incr (type)"); - check_equal(bufs.number, 1LL, "incr (value)"); - break; - case command::publish: - check_equal(event.second, resp3::type::number, "publish (type)"); - check_equal(bufs.number, 1LL, "publish (value)"); - break; - case command::hincrby: - check_equal(event.second, resp3::type::number, "hincrby (type)"); - check_equal(bufs.number, 10LL, "hincrby (value)"); - break; - case command::zadd: - check_equal(event.second, resp3::type::number, "zadd (type)"); - check_equal(bufs.number, 1LL, "zadd (value)"); - break; - case command::sadd: - check_equal(event.second, resp3::type::number, "sadd (type)"); - check_equal(bufs.number, 3LL, "sadd (value)"); - bufs.simple_string.clear(); - break; - case command::hdel: - check_equal(event.second, resp3::type::number, "hdel (type)"); - check_equal(bufs.number, 2LL, "hdel (value)"); - bufs.simple_string.clear(); - break; - case command::zremrangebyscore: - check_equal(event.second, resp3::type::number, "zremrangebyscore (type)"); - check_equal(bufs.number, 1LL, "zremrangebyscore (value)"); - bufs.simple_string.clear(); - break; - case command::lpop: - { - if (event.second == resp3::type::array) { - check_equal(bufs.array, {"4", "5"}, "lpop (count) (value)"); - bufs.array.clear(); - } else if (event.second == resp3::type::blob_string) { - check_equal(bufs.blob_string, {"3"}, "lpop (value) (value)"); - bufs.array.clear(); - } else { - std::cout << "Error: in lpop test_general." << std::endl; + switch (event.first) { + case command::multi: check_equal(bufs.simple_string, {"OK"}, "multi"); break; + case command::ping: check_equal(bufs.simple_string, {"QUEUED"}, "ping"); break; + case command::set: check_equal(bufs.simple_string, {"OK"}, "set"); break; + case command::quit: check_equal(bufs.simple_string, {"OK"}, "quit"); break; + case command::flushall: check_equal(bufs.simple_string, {"OK"}, "flushall"); break; + case command::ltrim: check_equal(bufs.simple_string, {"OK"}, "ltrim"); break; + default: { + std::cout << "Error: " << event.first << " " << event.second << std::endl; + } } } break; - default: { - check_equal(event.second, resp3::type::push, "push (type)"); - // TODO: Check the responses below. - // {"subscribe", "channel", "1"} - // {"message", "channel", "message"} - // - //std::cout - // << "ERROR: unexpected event in test_general. " - // << event.first << " " - // << event.second << " " - // << std::endl; - } + case resp3::type::number: { + switch (event.first) { + case command::append: check_equal(bufs.number, 4LL, "append"); break; + case command::hset: check_equal(bufs.number, 2LL, "hset"); break; + case command::rpush: check_equal(bufs.number, (resp3::number)std::size(filler.list_), "rpush (value)"); break; + case command::del: check_equal(bufs.number, 1LL, "del"); break; + case command::llen: check_equal(bufs.number, 6LL, "llen"); break; + case command::incr: check_equal(bufs.number, 1LL, "incr"); break; + case command::publish: check_equal(bufs.number, 1LL, "publish"); break; + case command::hincrby: check_equal(bufs.number, 10LL, "hincrby"); break; + case command::zadd: check_equal(bufs.number, 1LL, "zadd"); break; + case command::sadd: check_equal(bufs.number, 3LL, "sadd"); break; + case command::hdel: check_equal(bufs.number, 2LL, "hdel"); break; + case command::zremrangebyscore: check_equal(bufs.number, 1LL, "zremrangebyscore"); break; + default: { + std::cout << "Error: " << event.first << " " << event.second << std::endl; + } + } + } break; + case resp3::type::blob_string: { + switch (event.first) { + case command::get: check_equal(bufs.blob_string, filler.set_, "get"); break; + case command::hget: check_equal(bufs.blob_string, std::string{"value2"}, "hget"); break; + case command::lpop: check_equal(bufs.blob_string, {"3"}, "lpop"); break; + default: { + std::cout << "Error: " << event.first << " " << event.second << std::endl; + } + } + } break; + case resp3::type::push: { + switch (push_counter) { + case 0: + check_equal(bufs.push, {"subscribe", "channel", "1"}, "push (value1)"); + break; + case 1: + check_equal(bufs.push, {"message", "channel", "message"}, "push (value2)"); + break; + defalt: + std::cout + << "ERROR: unexpected event in test_general. " + << event.first << " " + << event.second << " " + << std::endl; + } + ++push_counter; + } break; + case resp3::type::array: { + switch (event.first) { + case command::lrange: check_equal(bufs.array, {"1", "2", "3", "4", "5", "6"}, "lrange"); break; + case command::hvals: check_equal(bufs.array, {"value1", "value2"}, "hvals"); break; + case command::zrange: check_equal(bufs.array, {"Marcelo"}, "hvals"); break; + case command::zrangebyscore: check_equal(bufs.array, {"Marcelo"}, "zrangebyscore"); break; + case command::lpop: check_equal(bufs.array, {"4", "5"}, "lpop"); break; + case command::exec: + // TODO: Remove resp3::type::transaction? It is not resp3 + // native. + check_equal_number(event.second, resp3::type::array, "exec (type)"); + check_equal(std::size(bufs.transaction), 2lu, "exec (size)"); + + check_equal(bufs.transaction[0].cmd, command::unknown, "transaction ping (command)"); + check_equal(bufs.transaction[0].depth, 1, "transaction (depth)"); + check_equal(bufs.transaction[0].type, resp3::type::simple_string, "transaction (type)"); + check_equal(bufs.transaction[0].expected_size, 1, "transaction (size)"); + + check_equal(bufs.transaction[1].cmd, command::unknown, "transaction ping (command)"); + check_equal(bufs.transaction[1].depth, 1, "transaction (depth)"); + check_equal(bufs.transaction[1].type, resp3::type::simple_string, "transaction (typ)e"); + check_equal(bufs.transaction[1].expected_size, 1, "transaction (size)"); + + bufs.transaction.clear(); + break; + default: { + std::cout << "Error: " << event.first << " " << event.second << std::endl; + } + } + } break; + case resp3::type::map: { + switch (event.first) { + case command::hgetall: check_equal(bufs.map, {"field1", "value1", "field2", "value2"}, "hgetall (value)"); break; + case command::hello: + { + auto const empty = prepare_queue(reqs); + filler(reqs.back()); + if (empty) + co_await async_write_all(socket, reqs); + } break; + default: { + std::cout << "Error: " << event.first << " " << event.second << std::endl; + } + } + } break; + case resp3::type::set: { + switch (event.first) { + case command::smembers: check_equal(bufs.set, {"1", "2", "3"}, "smembers (value)"); break; + default: { + std::cout << "Error: " << event.first << " " << event.second << std::endl; + } + } + } break; + default: { + std::cout << "Error: " << event.first << " " << event.second << std::endl; + } } + + bufs.blob_string.clear(); + bufs.push.clear(); + bufs.map.clear(); + bufs.set.clear(); } } //------------------------------------------------------------------- -void trans_filler(auto& req) -{ - req.subscribe("some-channel"); - req.publish("some-channel", "message0"); - req.multi(); - req.ping(); - req.ping(); - - // TODO: It looks like we can't publish to a channel we are already - // subscribed to from inside a transaction. - //req.publish("some-channel", "message1"); - - req.exec(); - req.incr("a"); - req.publish("some-channel", "message2"); -}; - -class trans_receiver : public receiver_base { -private: - int counter_ = 0; - std::shared_ptr conn_; - response_buffers& buffers_; - -public: - trans_receiver(std::shared_ptr conn, response_buffers& bufs) : conn_{conn}, buffers_{bufs} { } - - void on_hello(resp3::type type) noexcept override - { - auto f = [this](auto& req) - { - req.flushall(); - trans_filler(req); - }; - - for (auto i = 0; i < 3; ++i) { - conn_->send(f); - } - } - - void on_push() noexcept override - { - auto& v = buffers_.push; - assert(std::size(v) == 3U); - - auto const i = counter_ % 3; - switch (i) { - case 0: - { - check_equal(v[0], {"subscribe"}, "on_push subscribe (transaction)"); - check_equal(v[1], {"some-channel"}, "on_push (transaction)"); - check_equal(v[2], {"1"}, "on_push (transaction)"); - } break; - case 1: - { - check_equal(v[0], {"message"}, "on_push message (transaction)"); - check_equal(v[1], {"some-channel"}, "on_push (transaction)"); - check_equal(v[2], {"message0"}, "on_push (transaction)"); - } break; - //case 2: // See not above. - //{ - // check_equal(v[0], {"message"}, "on_push message (transaction)"); - // check_equal(v[1], {"some-channel"}, "on_push (transaction)"); - // check_equal(v[2], {"message1"}, "on_push (transaction)"); - //} break; - case 2: - { - check_equal(v[0], {"message"}, "on_push message (transaction)"); - check_equal(v[1], {"some-channel"}, "on_push (transaction)"); - check_equal(v[2], {"message2"}, "on_push (transaction)"); - } break; - defaul: { - std::cout << "Error: on_push (transaction)" << std::endl; - } - } - - ++counter_; - v.clear(); - } - - void on_flushall(resp3::type type) noexcept override - { check_equal(buffers_.simple_string, {"OK"}, "flushall (transaction)"); } - - void on_exec(resp3::type type) noexcept override - { - check_equal(buffers_.transaction[0].cmd, command::unknown, "transaction ping (command)"); - check_equal(buffers_.transaction[0].depth, 1, "transaction (depth)"); - check_equal(buffers_.transaction[0].type, resp3::type::simple_string, "transaction (type)"); - check_equal(buffers_.transaction[0].expected_size, 1, "transaction (size)"); - - check_equal(buffers_.transaction[1].cmd, command::unknown, "transaction ping (command)"); - check_equal(buffers_.transaction[1].depth, 1, "transaction (depth)"); - check_equal(buffers_.transaction[1].type, resp3::type::simple_string, "transaction (typ)e"); - check_equal(buffers_.transaction[1].expected_size, 1, "transaction (size)"); - - // See note above - //check_equal(result[2].command, command::publish, "transaction publish (command)"); - //check_equal_number(result[2].depth, 1, "transaction (depth)"); - //check_equal_number(result[2].type, resp3::type::number, "transaction (type)"); - //check_equal_number(result[2].expected_size, 1, "transaction (size)"); - buffers_.transaction.clear(); - } - - void on_quit(resp3::type type) noexcept override - { check_equal(buffers_.simple_string, {"OK"}, "quit"); } - - void on_publish(resp3::type type) noexcept override - { - check_equal((int)buffers_.number, 1, "publish (transaction)"); - } - - void on_ping(resp3::type type) noexcept override - { - check_equal(buffers_.simple_string, {"QUEUED"}, "ping"); - conn_->send([this](auto& req) { req.quit(); }); - } - - void on_multi(resp3::type type) noexcept override - { - check_equal(buffers_.simple_string, {"OK"}, "multi"); - } -}; - -void test_trans() -{ - net::io_context ioc; - connection::config cfg{"127.0.0.1", "6379", 3, 10000}; - auto conn = std::make_shared(ioc.get_executor(), cfg); - - response_buffers bufs; - trans_receiver recv{conn, bufs}; - conn->start(recv, bufs); - ioc.run(); -} - -//------------------------------------------------------------------- - net::awaitable test_list(net::ip::tcp::resolver::results_type const& results) { @@ -407,55 +271,55 @@ test_list(net::ip::tcp::resolver::results_type const& results) { // hello detail::response_ignore res; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); } { // flushall resp3::simple_string buffer; detail::response_simple_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"OK"}, "flushall"); } { // rpush resp3::number buffer; detail::response_number res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, (long long int)6, "rpush"); } { // lrange resp3::array_int buffer; detail::response_basic_array res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, list, "lrange-1"); } { // lrange resp3::array_int buffer; detail::response_basic_array res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, std::vector{3, 4, 5}, "lrange-2"); } { // ltrim resp3::simple_string buffer; detail::response_simple_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"OK"}, "ltrim"); } { // lpop. Why a blob string instead of a number? resp3::blob_string buffer; detail::response_blob_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"3"}, "lpop"); } { // quit resp3::simple_string buffer; detail::response_simple_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"OK"}, "quit"); } } @@ -492,56 +356,56 @@ test_set(net::ip::tcp::resolver::results_type const& results) std::string buf; { // hello, flushall response_ignore res; - co_await async_read(socket, buf, res); - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); + co_await async_read_one(socket, buf, res); } { // set resp3::simple_string buffer; response_simple_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"OK"}, "set1"); } { // get resp3::blob_string buffer; response_blob_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, test_bulk1, "get1"); } { // set resp3::simple_string buffer; response_simple_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"OK"}, "set1"); } { // get resp3::blob_string buffer; response_blob_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, test_bulk2, "get2"); } { // set resp3::simple_string buffer; response_simple_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"OK"}, "set3"); } { // get resp3::blob_string buffer; response_blob_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, std::string {}, "get3"); } { // quit resp3::simple_string buffer; response_simple_string res{&buffer}; - co_await async_read(socket, buf, res); + co_await async_read_one(socket, buf, res); check_equal(buffer, {"OK"}, "quit"); } } @@ -563,7 +427,7 @@ net::awaitable test_simple_string() test_tcp_socket ts {cmd}; resp3::simple_string buffer; response_simple_string res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"OK"}, "simple_string"); //check_equal(res.attribute.value, {}, "simple_string (empty attribute)"); } @@ -574,7 +438,7 @@ net::awaitable test_simple_string() test_tcp_socket ts {cmd}; resp3::simple_string buffer; response_simple_string res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "simple_string (empty)"); //check_equal(res.attribute.value, {}, "simple_string (empty attribute)"); } @@ -588,7 +452,7 @@ net::awaitable test_simple_string() // cmd += "\r\n"; // test_tcp_socket ts {cmd}; // response_simple_string res; - // co_await async_read(ts, buffer, res); + // co_await async_read_one(ts, buffer, res); // check_equal(res.result, str, "simple_string (large)"); // //check_equal(res.attribute.value, {}, "simple_string (empty attribute)"); //} @@ -603,7 +467,7 @@ net::awaitable test_number() test_tcp_socket ts {cmd}; resp3::number buffer; response_number res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, (long long int)-3, "number (int)"); } @@ -612,7 +476,7 @@ net::awaitable test_number() test_tcp_socket ts {cmd}; resp3::number buffer; response_number res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, (long long int)3, "number (unsigned)"); } @@ -621,7 +485,7 @@ net::awaitable test_number() test_tcp_socket ts {cmd}; resp3::number buffer; response_number res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, (long long int)1111111, "number (std::size_t)"); } } @@ -635,7 +499,7 @@ net::awaitable test_array() test_tcp_socket ts {cmd}; resp3::array buffer; response_array res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"one", "two", "three"}, "array (dynamic)"); } @@ -643,7 +507,7 @@ net::awaitable test_array() std::string cmd {"*3\r\n$3\r\none\r\n$3\r\ntwo\r\n$5\r\nthree\r\n"}; test_tcp_socket ts {cmd}; response_static_array res; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(res.result, {"one", "two", "three"}, "array (static)"); } @@ -651,7 +515,7 @@ net::awaitable test_array() std::string cmd {"*3\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n"}; test_tcp_socket ts {cmd}; response_static_array res; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(res.result, {1, 2, 3}, "array (int)"); } @@ -660,7 +524,7 @@ net::awaitable test_array() test_tcp_socket ts {cmd}; resp3::array buffer; response_array res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "array (empty)"); } } @@ -674,7 +538,7 @@ net::awaitable test_blob_string() test_tcp_socket ts {cmd}; resp3::blob_string buffer; response_blob_string res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"hh"}, "blob_string"); } @@ -683,7 +547,7 @@ net::awaitable test_blob_string() test_tcp_socket ts {cmd}; resp3::blob_string buffer; response_blob_string res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}, "blob_string (with separator)"); } @@ -692,7 +556,7 @@ net::awaitable test_blob_string() test_tcp_socket ts {cmd}; resp3::blob_string buffer; response_blob_string res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "blob_string (size 0)"); } } @@ -706,7 +570,7 @@ net::awaitable test_simple_error() test_tcp_socket ts {cmd}; resp3::simple_error buffer; response_simple_error res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"Error"}, "simple_error (message)"); } } @@ -720,7 +584,7 @@ net::awaitable test_floating_point() test_tcp_socket ts {cmd}; resp3::doublean buffer; response_double res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"1.23"}, "double"); } @@ -729,7 +593,7 @@ net::awaitable test_floating_point() test_tcp_socket ts {cmd}; resp3::doublean buffer; response_double res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"inf"}, "double (inf)"); } @@ -738,7 +602,7 @@ net::awaitable test_floating_point() test_tcp_socket ts {cmd}; resp3::doublean buffer; response_double res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"-inf"}, "double (-inf)"); } @@ -753,7 +617,7 @@ net::awaitable test_boolean() test_tcp_socket ts {cmd}; resp3::boolean buffer; response_bool res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, false, "bool (false)"); } @@ -762,7 +626,7 @@ net::awaitable test_boolean() test_tcp_socket ts {cmd}; resp3::boolean buffer; response_bool res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, true, "bool (true)"); } } @@ -776,7 +640,7 @@ net::awaitable test_blob_error() test_tcp_socket ts {cmd}; resp3::blob_error buffer; response_blob_error res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"SYNTAX invalid syntax"}, "blob_error (message)"); } @@ -785,7 +649,7 @@ net::awaitable test_blob_error() test_tcp_socket ts {cmd}; resp3::blob_error buffer; response_blob_error res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "blob_error (empty message)"); } } @@ -799,7 +663,7 @@ net::awaitable test_verbatim_string() test_tcp_socket ts {cmd}; resp3::verbatim_string buffer; response_verbatim_string res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"txt:Some string"}, "verbatim_string"); } @@ -808,7 +672,7 @@ net::awaitable test_verbatim_string() test_tcp_socket ts {cmd}; resp3::verbatim_string buffer; response_verbatim_string res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "verbatim_string (empty)"); } } @@ -822,7 +686,7 @@ net::awaitable test_set2() test_tcp_socket ts {cmd}; resp3::set buffer; response_set res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"orange", "apple", "one", "two", "three"}, "set"); } @@ -831,7 +695,7 @@ net::awaitable test_set2() test_tcp_socket ts {cmd}; resp3::set buffer; response_set res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"orange", "apple", "one", "two", "three"}, "set (flat)"); } @@ -840,7 +704,7 @@ net::awaitable test_set2() test_tcp_socket ts {cmd}; resp3::set buffer; response_set res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "set (empty)"); } } @@ -854,7 +718,7 @@ net::awaitable test_map() test_tcp_socket ts {cmd}; resp3::map buffer; response_map res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"server", "redis", "version", "6.0.9", "proto", "3", "id", "203", "mode", "standalone", "role", "master", "modules"}, "map (flat)"); } @@ -863,7 +727,7 @@ net::awaitable test_map() test_tcp_socket ts {cmd}; resp3::map buffer; response_map res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "map (flat - empty)"); } } @@ -877,7 +741,7 @@ net::awaitable test_streamed_string() test_tcp_socket ts {cmd}; resp3::streamed_string_part buffer; response_streamed_string_part res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {"Hello word"}, "streamed string"); } @@ -886,7 +750,7 @@ net::awaitable test_streamed_string() test_tcp_socket ts {cmd}; resp3::array buffer; response_array res{&buffer}; - co_await async_read(ts, buf, res); + co_await async_read_one(ts, buf, res); check_equal(buffer, {}, "streamed string (empty)"); } } @@ -898,7 +762,7 @@ net::awaitable offline() // std::string cmd {"|1\r\n+key-popularity\r\n%2\r\n$1\r\na\r\n,0.1923\r\n$1\r\nb\r\n,0.0012\r\n"}; // test_tcp_socket ts {cmd}; // response_array res; - // co_await async_read(ts, buf, res); + // co_await async_read_one(ts, buf, res); // check_equal(res.result, {"key-popularity", "a", "0.1923", "b", "0.0012"}, "attribute"); //} @@ -906,7 +770,7 @@ net::awaitable offline() // std::string cmd {">4\r\n+pubsub\r\n+message\r\n+foo\r\n+bar\r\n"}; // test_tcp_socket ts {cmd}; // response_array res; - // co_await async_read(ts, buf, res); + // co_await async_read_one(ts, buf, res); // check_equal(res.result, {"pubsub", "message", "foo", "bar"}, "push type"); //} @@ -914,7 +778,7 @@ net::awaitable offline() // std::string cmd {">0\r\n"}; // test_tcp_socket ts {cmd}; // response_array res; - // co_await async_read(ts, buf, res); + // co_await async_read_one(ts, buf, res); // check_equal(res.result, {}, "push type (empty)"); //} } @@ -941,6 +805,5 @@ int main(int argc, char* argv[]) co_spawn(ioc, test_set(res), net::detached); co_spawn(ioc, test_general(res), net::detached); ioc.run(); - test_trans(); }