diff --git a/INSTALL b/INSTALL index ccbd8674..606dc9b5 100644 --- a/INSTALL +++ b/INSTALL @@ -1 +1 @@ -CC=/opt/gcc-10.2.0/bin/gcc-10.2.0 CXX=/opt/gcc-10.2.0/bin/g++-10.2.0 CXXFLAGS="-std=c++20 -fcoroutines" ./configure --with-boost=/opt/boost_1_75_0 --with-boost-libdir=/opt/boost_1_75_0/lib +CC=/opt/gcc-10.2.0/bin/gcc-10.2.0 CXX=/opt/gcc-10.2.0/bin/g++-10.2.0 CXXFLAGS="-std=c++20 -fcoroutines -g" ./configure --with-boost=/opt/boost_1_75_0 --with-boost-libdir=/opt/boost_1_75_0/lib diff --git a/Makefile.am b/Makefile.am index a7f744f7..fcfb8cba 100644 --- a/Makefile.am +++ b/Makefile.am @@ -12,30 +12,27 @@ noinst_PROGRAMS = noinst_LIBRARIES = libaedis.a libaedis_a_SOURCES = -libaedis_a_SOURCES += $(top_srcdir)/aedis/aedis.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/config.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/connection.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/read.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/receiver_base.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/request.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/resp_types.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/type.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/command.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/version.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/write.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/impl/connection.ipp -libaedis_a_SOURCES += $(top_srcdir)/aedis/impl/read.ipp -libaedis_a_SOURCES += $(top_srcdir)/aedis/impl/src.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/impl/type.ipp -libaedis_a_SOURCES += $(top_srcdir)/aedis/impl/command.ipp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/parser.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/response_base.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/response_buffers.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/response_types.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/responses.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/utils.hpp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/impl/parser.ipp -libaedis_a_SOURCES += $(top_srcdir)/aedis/detail/impl/response_buffers.ipp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/aedis.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/buffers.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/net.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/connection.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/receiver_base.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/pipeline.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/type.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/command.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/version.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/impl/connection.ipp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/impl/src.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/impl/type.ipp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/impl/command.ipp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/write.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/read.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/parser.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/response_adapter_base.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/response_buffers.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/utils.hpp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/impl/read.ipp +libaedis_a_SOURCES += $(top_srcdir)/include/aedis/detail/impl/parser.ipp libaedis_a_SOURCES += $(top_srcdir)/src/aedis.cpp libaedis_a_CPPFLAGS = libaedis_a_CPPFLAGS += $(BOOST_CPPFLAGS) diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index fc24d1ab..92a8b44c 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -10,35 +10,46 @@ using namespace aedis; -class myreceiver : public receiver_base { -private: - std::shared_ptr conn_; - -public: - myreceiver(std::shared_ptr conn) : conn_{conn} { } - - void on_hello(resp3::array& v) noexcept override - { - conn_->ping(); - conn_->psubscribe({"aaa*"}); - conn_->quit(); +void print_helper(command cmd, resp3::type type, buffers& buf) +{ + switch (type) { + case resp3::type::simple_string: + { + std::cout << cmd << " " << buf.simple_string << " (" << type << ")" << std::endl; + } break; + case resp3::type::push: + case resp3::type::map: + { + std::cout << cmd << " (" << type << ")" << std::endl; + } break; + default:{} } +} - void on_ping(resp3::simple_string& s) noexcept override - { std::cout << "PING: " << s << std::endl; } +struct myreceiver { + std::shared_ptr conn; + buffers& buf; - void on_quit(resp3::simple_string& s) noexcept override - { std::cout << "QUIT: " << s << std::endl; } + void operator()(command cmd, resp3::type type) const + { + if (cmd == command::hello) { + assert(type == resp3::type::map); + conn->ping(); + conn->psubscribe({"aaa*"}); + conn->quit(); + } - void on_push(resp3::array& s) noexcept override - { std::cout << "on_push: "; print(s); } + print_helper(cmd, type, buf); + } }; int main() { net::io_context ioc {1}; auto conn = std::make_shared(ioc.get_executor()); - myreceiver recv{conn}; - conn->start(recv); + + buffers bufs; + myreceiver recv{conn, bufs}; + conn->run(std::ref(recv), bufs); ioc.run(); } diff --git a/include/aedis/buffers.hpp b/include/aedis/buffers.hpp new file mode 100644 index 00000000..35d43bc3 --- /dev/null +++ b/include/aedis/buffers.hpp @@ -0,0 +1,34 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +namespace aedis { + +struct buffers { + resp3::transaction transaction; + resp3::array array; + resp3::array push; + resp3::set set; + resp3::map map; + resp3::array attribute; + resp3::simple_string simple_string; + resp3::simple_error simple_error; + resp3::number number; + resp3::doublean doublean; + resp3::boolean boolean; + resp3::big_number big_number; + resp3::blob_string blob_string; + resp3::blob_error blob_error; + resp3::verbatim_string verbatim_string; + resp3::streamed_string_part streamed_string_part; +}; + +} // aedis + diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index ee9fd73d..e7c3c898 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -10,8 +10,9 @@ #include #include +#include #include -#include +#include #include "net.hpp" #include "type.hpp" @@ -46,14 +47,31 @@ public: private: net::ip::tcp::socket socket_; std::string buffer_; - detail::buffers buffers_; - detail::response_buffers resps_; std::queue reqs_; - bool reconnect_ = false; config conf_; boost::system::error_code ec_; - net::awaitable worker_coro(receiver_base& recv); + template + net::awaitable worker_coro(Receiver receiver, buffers& bufs) + { + try { + auto ex = co_await net::this_coro::executor; + + net::ip::tcp::resolver resolver{ex}; + auto const res = resolver.resolve(conf_.host, conf_.port); + + co_await async_connect(socket_, res, net::use_awaitable); + + send([](auto& req) { req.hello(); }); + + detail::response_adapters adapters{bufs}; + for (;;) { + auto const event = co_await detail::async_consume(socket_, buffer_, adapters, reqs_); + receiver(event.first, event.second); + } + } catch (...) { + } + } public: /// Contructs a connection. @@ -62,7 +80,18 @@ public: config const& conf = config {"127.0.0.1", "6379", 1000, 10000}); /// Stablishes the connection with the redis server. - void start(receiver_base& recv); + void start(receiver_base& recv, buffers& bufs); + + template + void run(Receiver receiver, buffers& bufs) + { + auto self = this->shared_from_this(); + + auto f = [self, receiver, &bufs] () mutable + { return self->worker_coro(receiver, bufs); }; + + net::co_spawn(socket_.get_executor(), f, net::detached); + } /** Adds commands to the ouput queue. The Filler signature must be * @@ -95,8 +124,6 @@ public: auto queue_size() const noexcept { return std::ssize(reqs_); } - void enable_reconnect() noexcept; - /// Adds ping to the request, see https://redis.io/commands/bgrewriteaof void ping() { send([](auto& req){ req.ping();}); } diff --git a/include/aedis/detail/impl/parser.ipp b/include/aedis/detail/impl/parser.ipp index a907ce89..747f7d85 100644 --- a/include/aedis/detail/impl/parser.ipp +++ b/include/aedis/detail/impl/parser.ipp @@ -20,10 +20,10 @@ long long length(char const* p) return len; } -parser::parser(response_base* res) +parser::parser(response_adapter_base* res) { init(res); } -void parser::init(response_base* res) +void parser::init(response_adapter_base* res) { res_ = res; depth_ = 0; diff --git a/include/aedis/detail/impl/read.ipp b/include/aedis/detail/impl/read.ipp index a9618a47..2eeadeb9 100644 --- a/include/aedis/detail/impl/read.ipp +++ b/include/aedis/detail/impl/read.ipp @@ -7,206 +7,87 @@ #include -#define EXPAND_RECEIVER_CASE(var, x) case command::x: recv.on_##x(*var.result); break +#define EXPAND_RECEIVER_CASE(type, cmd) case command::cmd: recv.on_##cmd(type); break namespace aedis { namespace detail { -bool queue_pop(std::queue& reqs) -{ - assert(!std::empty(reqs)); - assert(!std::empty(reqs.front().cmds)); - - reqs.front().cmds.pop(); - if (std::empty(reqs.front().cmds)) { - reqs.pop(); - return true; - } - - return false; -} - -void -forward_transaction( - resp3::transaction_result& result, - std::deque> const& ids, - receiver_base& recv) -{ - assert(std::size(ids) == std::size(result)); - - for (auto i = 0U; i < std::size(ids); ++i) - result[i].cmd = ids[i].first; - - recv.on_transaction(result); -} - void forward( - response_buffers& buffers, command cmd, resp3::type type, receiver_base& recv) { - switch (type) { - case resp3::type::set: - { - switch (cmd) { - EXPAND_RECEIVER_CASE(buffers.resp_set, smembers); - default: {assert(false);} - } - buffers.resp_set.result->clear(); - } break; - case resp3::type::map: - { - switch (cmd) { - EXPAND_RECEIVER_CASE(buffers.resp_map, hello); - EXPAND_RECEIVER_CASE(buffers.resp_map, hgetall); - default: {assert(false);} - } - buffers.resp_map.result->clear(); - } break; - case resp3::type::array: - { - switch (cmd) { - EXPAND_RECEIVER_CASE(buffers.resp_array, acl_list); - EXPAND_RECEIVER_CASE(buffers.resp_array, acl_users); - EXPAND_RECEIVER_CASE(buffers.resp_array, acl_getuser); - EXPAND_RECEIVER_CASE(buffers.resp_array, acl_cat); - EXPAND_RECEIVER_CASE(buffers.resp_array, acl_log); - EXPAND_RECEIVER_CASE(buffers.resp_array, acl_help); - EXPAND_RECEIVER_CASE(buffers.resp_array, lrange); - EXPAND_RECEIVER_CASE(buffers.resp_array, lpop); - EXPAND_RECEIVER_CASE(buffers.resp_array, zrange); - EXPAND_RECEIVER_CASE(buffers.resp_array, zrangebyscore); - EXPAND_RECEIVER_CASE(buffers.resp_array, hvals); - default: {assert(false);} - } - buffers.resp_array.result->clear(); - } break; - case resp3::type::simple_string: - { - switch (cmd) { - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, acl_load); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, acl_save); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, acl_setuser); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, acl_log); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, ping); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, quit); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, flushall); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, ltrim); - EXPAND_RECEIVER_CASE(buffers.resp_simple_string, set); - default: {assert(false);} - } - buffers.resp_simple_string.result->clear(); - } break; - case resp3::type::number: - { - switch (cmd) { - EXPAND_RECEIVER_CASE(buffers.resp_number, acl_deluser); - EXPAND_RECEIVER_CASE(buffers.resp_number, rpush); - EXPAND_RECEIVER_CASE(buffers.resp_number, del); - EXPAND_RECEIVER_CASE(buffers.resp_number, llen); - EXPAND_RECEIVER_CASE(buffers.resp_number, publish); - EXPAND_RECEIVER_CASE(buffers.resp_number, incr); - EXPAND_RECEIVER_CASE(buffers.resp_number, append); - EXPAND_RECEIVER_CASE(buffers.resp_number, hset); - EXPAND_RECEIVER_CASE(buffers.resp_number, hincrby); - EXPAND_RECEIVER_CASE(buffers.resp_number, zadd); - EXPAND_RECEIVER_CASE(buffers.resp_number, zremrangebyscore); - EXPAND_RECEIVER_CASE(buffers.resp_number, expire); - EXPAND_RECEIVER_CASE(buffers.resp_number, sadd); - EXPAND_RECEIVER_CASE(buffers.resp_number, hdel); - default: {assert(false);} - } - } break; - case resp3::type::doublean: - { - switch (cmd) { - default: {assert(false);} - } - } break; - case resp3::type::big_number: - { - switch (cmd) { - default: {assert(false);} - } - buffers.resp_big_number.result->clear(); - } break; - case resp3::type::boolean: - { - switch (cmd) { - default: {assert(false);} - } - *buffers.resp_boolean.result = false; - } break; - case resp3::type::blob_string: - { - switch (cmd) { - EXPAND_RECEIVER_CASE(buffers.resp_blob_string, acl_genpass); - EXPAND_RECEIVER_CASE(buffers.resp_blob_string, acl_whoami); - EXPAND_RECEIVER_CASE(buffers.resp_blob_string, lpop); - EXPAND_RECEIVER_CASE(buffers.resp_blob_string, get); - EXPAND_RECEIVER_CASE(buffers.resp_blob_string, hget); - default: {assert(false);} - } - buffers.resp_blob_string.result->clear(); - } break; - case resp3::type::verbatim_string: - { - switch (cmd) { - default: {assert(false);} - } - buffers.resp_verbatim_string.result->clear(); - } break; - case resp3::type::streamed_string_part: - { - switch (cmd) { - default: {assert(false);} - } - buffers.resp_streamed_string_part.result->clear(); - } break; - case resp3::type::simple_error: - { - recv.on_simple_error(cmd, *buffers.resp_simple_error.result); - buffers.resp_simple_error.result->clear(); - } break; - case resp3::type::blob_error: - { - recv.on_blob_error(cmd, *buffers.resp_blob_error.result); - buffers.resp_blob_error.result->clear(); - } break; - case resp3::type::null: - { - recv.on_null(cmd); - } break; - case resp3::type::attribute: - { - throw std::runtime_error("Attribute are not supported yet."); - } break; - default: - { - assert(false); - } + switch (cmd) { + EXPAND_RECEIVER_CASE(type, acl_cat); + EXPAND_RECEIVER_CASE(type, acl_deluser); + EXPAND_RECEIVER_CASE(type, acl_genpass); + EXPAND_RECEIVER_CASE(type, acl_getuser); + EXPAND_RECEIVER_CASE(type, acl_help); + EXPAND_RECEIVER_CASE(type, acl_list); + EXPAND_RECEIVER_CASE(type, acl_load); + EXPAND_RECEIVER_CASE(type, acl_log); + EXPAND_RECEIVER_CASE(type, acl_save); + EXPAND_RECEIVER_CASE(type, acl_setuser); + EXPAND_RECEIVER_CASE(type, acl_users); + EXPAND_RECEIVER_CASE(type, acl_whoami); + EXPAND_RECEIVER_CASE(type, append); + EXPAND_RECEIVER_CASE(type, del); + EXPAND_RECEIVER_CASE(type, exec); + EXPAND_RECEIVER_CASE(type, expire); + EXPAND_RECEIVER_CASE(type, flushall); + EXPAND_RECEIVER_CASE(type, get); + EXPAND_RECEIVER_CASE(type, hdel); + EXPAND_RECEIVER_CASE(type, hello); + EXPAND_RECEIVER_CASE(type, hget); + EXPAND_RECEIVER_CASE(type, hgetall); + EXPAND_RECEIVER_CASE(type, hincrby); + EXPAND_RECEIVER_CASE(type, hset); + EXPAND_RECEIVER_CASE(type, hvals); + EXPAND_RECEIVER_CASE(type, incr); + EXPAND_RECEIVER_CASE(type, llen); + EXPAND_RECEIVER_CASE(type, lpop); + EXPAND_RECEIVER_CASE(type, lrange); + EXPAND_RECEIVER_CASE(type, ltrim); + EXPAND_RECEIVER_CASE(type, multi); + EXPAND_RECEIVER_CASE(type, ping); + EXPAND_RECEIVER_CASE(type, publish); + EXPAND_RECEIVER_CASE(type, quit); + EXPAND_RECEIVER_CASE(type, rpush); + EXPAND_RECEIVER_CASE(type, sadd); + EXPAND_RECEIVER_CASE(type, set); + EXPAND_RECEIVER_CASE(type, smembers); + EXPAND_RECEIVER_CASE(type, zadd); + EXPAND_RECEIVER_CASE(type, zrange); + EXPAND_RECEIVER_CASE(type, zrangebyscore); + EXPAND_RECEIVER_CASE(type, zremrangebyscore); + default: {assert(false);} } } -response_base* select_buffer(response_buffers& buffers, resp3::type type) +response_adapter_base* select_buffer(response_adapters& adapters, resp3::type type, command cmd) { + if (type == resp3::type::push) + return &adapters.resp_push; + + if (cmd == command::exec) + return &adapters.resp_transaction; + switch (type) { - case resp3::type::set: return &buffers.resp_set; - case resp3::type::map: return &buffers.resp_map; - case resp3::type::attribute: return &buffers.resp_attribute; - case resp3::type::array: return &buffers.resp_array; - case resp3::type::simple_error: return &buffers.resp_simple_error; - case resp3::type::simple_string: return &buffers.resp_simple_string; - case resp3::type::number: return &buffers.resp_number; - case resp3::type::doublean: return &buffers.resp_double; - case resp3::type::big_number: return &buffers.resp_big_number; - case resp3::type::boolean: return &buffers.resp_boolean; - case resp3::type::blob_error: return &buffers.resp_blob_error; - case resp3::type::blob_string: return &buffers.resp_blob_string; - case resp3::type::verbatim_string: return &buffers.resp_verbatim_string; - case resp3::type::streamed_string_part: return &buffers.resp_streamed_string_part; - case resp3::type::null: return &buffers.resp_ignore; + case resp3::type::set: return &adapters.resp_set; + case resp3::type::map: return &adapters.resp_map; + case resp3::type::attribute: return &adapters.resp_attribute; + case resp3::type::array: return &adapters.resp_array; + case resp3::type::simple_error: return &adapters.resp_simple_error; + case resp3::type::simple_string: return &adapters.resp_simple_string; + case resp3::type::number: return &adapters.resp_number; + case resp3::type::doublean: return &adapters.resp_double; + case resp3::type::big_number: return &adapters.resp_big_number; + case resp3::type::boolean: return &adapters.resp_boolean; + case resp3::type::blob_error: return &adapters.resp_blob_error; + case resp3::type::blob_string: return &adapters.resp_blob_string; + case resp3::type::verbatim_string: return &adapters.resp_verbatim_string; + case resp3::type::streamed_string_part: return &adapters.resp_streamed_string_part; + case resp3::type::null: return &adapters.resp_ignore; default: { throw std::runtime_error("response_buffers"); return nullptr; diff --git a/include/aedis/detail/parser.hpp b/include/aedis/detail/parser.hpp index 7a2be6cf..7b36a8cc 100644 --- a/include/aedis/detail/parser.hpp +++ b/include/aedis/detail/parser.hpp @@ -9,7 +9,7 @@ #include -#include "response_base.hpp" +#include "response_adapter_base.hpp" namespace aedis { namespace detail { @@ -25,13 +25,13 @@ public: }; private: - response_base* res_; + response_adapter_base* res_; int depth_; int sizes_[6]; // Streaming will require a bigger integer. bulk_type bulk_; int bulk_length_; - void init(response_base* res); + void init(response_adapter_base* res); long long on_array_impl(char const* data, int m = 1); void on_array(char const* data) { res_->select_array(on_array_impl(data, 1)); } void on_push(char const* data) { res_->select_push(on_array_impl(data, 1)); } @@ -54,7 +54,7 @@ private: std::string_view handle_simple_string(char const* data, std::size_t n); public: - parser(response_base* res); + parser(response_adapter_base* res); std::size_t advance(char const* data, std::size_t n); auto done() const noexcept { return depth_ == 0 && bulk_ == bulk_type::none; } auto bulk() const noexcept { return bulk_; } diff --git a/include/aedis/detail/read.hpp b/include/aedis/detail/read.hpp index 6858879a..9d2294f5 100644 --- a/include/aedis/detail/read.hpp +++ b/include/aedis/detail/read.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -26,20 +27,15 @@ #include #include "parser.hpp" -#include "response_buffers.hpp" -#include "response_base.hpp" +#include "response_adapters.hpp" +#include "response_adapter_base.hpp" +#include "write.hpp" namespace aedis { namespace detail { -response_base* select_buffer(response_buffers& buffers, resp3::type t); - -void forward_transaction( - resp3::transaction_result& result, - std::deque> const& ids, - receiver_base& recv); +response_adapter_base* select_buffer(response_adapters& buffers, resp3::type t, command cmd); void forward( - response_buffers& buffers, command cmd, resp3::type type, receiver_base& recv); @@ -56,7 +52,7 @@ private: int start_ = 1; public: - parse_op(AsyncReadStream& stream, Storage* buf, response_base* res) + parse_op(AsyncReadStream& stream, Storage* buf, response_adapter_base* res) : stream_ {stream} , buf_ {buf} , parser_ {res} @@ -121,7 +117,7 @@ template auto read( SyncReadStream& stream, Storage& buf, - response_base& res, + response_adapter_base& res, boost::system::error_code& ec) { parser p {&res}; @@ -156,7 +152,7 @@ std::size_t read( SyncReadStream& stream, Storage& buf, - response_base& res) + response_adapter_base& res) { boost::system::error_code ec; auto const n = read(stream, buf, res, ec); @@ -176,7 +172,7 @@ template < auto async_read( AsyncReadStream& stream, Storage& buffer, - response_base& res, + response_adapter_base& res, CompletionToken&& token = net::default_completion_token_t{}) { @@ -193,13 +189,11 @@ class type_op { private: AsyncReadStream& stream_; Storage* buf_ = nullptr; - resp3::type* t_; public: - type_op(AsyncReadStream& stream, Storage* buf, resp3::type* t) + type_op(AsyncReadStream& stream, Storage* buf) : stream_ {stream} , buf_ {buf} - , t_ {t} { assert(buf_); } @@ -209,8 +203,10 @@ public: , boost::system::error_code ec = {} , std::size_t n = 0) { - if (ec) - return self.complete(ec); + if (ec) { + self.complete(ec, resp3::type::invalid); + return; + } if (std::empty(*buf_)) { net::async_read_until( @@ -222,8 +218,11 @@ public: } assert(!std::empty(*buf_)); - *t_ = resp3::to_type(buf_->front()); - return self.complete(ec); + auto const type = resp3::to_type(buf_->front()); + // TODO: when type = resp3::type::invalid should we report an error or + // complete normally and let the caller check whether it is invalid. + self.complete(ec, type); + return; } }; @@ -236,152 +235,67 @@ template < auto async_read_type( AsyncReadStream& stream, Storage& buffer, - resp3::type& t, CompletionToken&& token = net::default_completion_token_t{}) { return net::async_compose < CompletionToken - , void(boost::system::error_code) - >(type_op {stream, &buffer, &t}, - token, - stream); + , void(boost::system::error_code, resp3::type) + >(type_op {stream, &buffer}, token, stream); } -// Returns true when a new pipeline can be sent to redis. -bool queue_pop(std::queue& reqs); - using transaction_queue_type = std::deque>; -// TODO: Implement as a composed operation. -template -net::awaitable -async_read_transaction( - AsyncReadWriteStream& socket, - Storage& buffer, - response_base& reader, - std::queue& reqs, - boost::system::error_code& ec) -{ - transaction_queue_type trans; - for (;;) { - auto const cmd = reqs.front().cmds.front(); - if (cmd != command::exec) { - response_static_string<6> tmp; - co_await async_read(socket, buffer, tmp, net::redirect_error(net::use_awaitable, ec)); - - if (ec) - co_return transaction_queue_type{}; - - // Failing to QUEUE a command inside a trasaction is - // considered an application error. The multi command - // always gets a "OK" response and all other commands get - // QUEUED unless the user is e.g. using wrong data types. - auto const* res = cmd == command::multi ? "OK" : "QUEUED"; - assert (tmp.result == res); - - // Pushes the command in the transction command queue that will be - // processed when exec arrives. - trans.push_back({reqs.front().cmds.front(), resp3::type::invalid}); - reqs.front().cmds.pop(); - continue; - } - - if (cmd == command::exec) { - assert(trans.front().first == command::multi); - co_await async_read(socket, buffer, reader, net::redirect_error(net::use_awaitable, ec)); - if (ec) - co_return transaction_queue_type{}; - - trans.pop_front(); // Removes multi. - co_return trans; - } - assert(false); - } - - co_return transaction_queue_type{}; -} - -// TODO: Handle errors properly. +// TODO: Convert into a composed operation with async_compose. template < class AsyncReadWriteStream, class Storage, - class Receiver, class ResponseBuffers> -net::awaitable -async_reader( +net::awaitable> +async_consume( AsyncReadWriteStream& socket, Storage& buffer, ResponseBuffers& resps, - Receiver& recv, - std::queue& reqs, - boost::system::error_code& ec) + std::queue& reqs) { - for (;;) { - auto type = resp3::type::invalid; - co_await async_read_type(socket, buffer, type, net::redirect_error(net::use_awaitable, ec)); - - if (ec) - co_return; - - if (type == resp3::type::invalid) { - // TODO: Add our own error code. - assert(false); - } - - if (type == resp3::type::push) { - co_await async_read( - socket, - buffer, - resps.resp_push, - net::redirect_error(net::use_awaitable, ec)); - - if (ec) - co_return; - - recv.on_push(*resps.resp_push.result); - continue; - } + auto const type = co_await detail::async_read_type(socket, buffer, net::use_awaitable); + assert(type != resp3::type::invalid); + auto cmd = command::unknown; + if (type != resp3::type::push) { assert(!std::empty(reqs)); assert(!std::empty(reqs.front().cmds)); - - if (reqs.front().cmds.front() == command::multi) { - // The exec response is an array where each element is the - // response of one command in the transaction. This requires - // a special response buffer, that can deal with recursive - // data types. - auto const trans_queue = - co_await async_read_transaction( - socket, - buffer, - resps.resp_tree, - reqs, - ec); - - if (ec) - co_return; - - forward_transaction(*resps.resp_tree.result, trans_queue, recv); - - if (queue_pop(reqs)) - co_await async_write_all(socket, reqs, ec); - - continue; - } - - auto const cmd = reqs.front().cmds.front(); - auto* tmp = select_buffer(resps, type); - co_await async_read(socket, buffer, *tmp, net::redirect_error(net::use_awaitable, ec)); - - if (ec) - co_return; - - forward(resps, cmd, type, recv); - - if (queue_pop(reqs)) - co_await async_write_all(socket, reqs, ec); + cmd = reqs.front().cmds.front(); } + + auto* buf_adapter = select_buffer(resps, type, cmd); + co_await detail::async_read(socket, buffer, *buf_adapter, net::use_awaitable); + + if (type != resp3::type::push) { + reqs.front().cmds.pop(); + + // If that were that last command in the pipeline, delete the pipeline too. + if (std::empty(reqs.front().cmds)) { + reqs.pop(); + // Now we should write any the next pipeline waiting in the + // queue. Notice that commands like unsubscribe have a push + // response type so we do not have to wait for a response before + // sending a new pipeline. + while (!std::empty(reqs)) { + auto buffer = net::buffer(reqs.front().payload); + auto const n = co_await async_write(socket, buffer, net::use_awaitable); + if (!std::empty(reqs.front().cmds)) + break; + + // We only pop when all commands in the pipeline has push + // responses like subscribe, otherwise, pop is done when the + // response arrives. + reqs.pop(); + } + } + } + + co_return std::make_pair(cmd, type); } } // detail diff --git a/include/aedis/detail/response_base.hpp b/include/aedis/detail/response_adapter_base.hpp similarity index 97% rename from include/aedis/detail/response_base.hpp rename to include/aedis/detail/response_adapter_base.hpp index e62f9ef3..4b4393c1 100644 --- a/include/aedis/detail/response_base.hpp +++ b/include/aedis/detail/response_adapter_base.hpp @@ -14,7 +14,7 @@ namespace aedis { namespace detail { -struct response_base { +struct response_adapter_base { virtual void pop() {} virtual void on_simple_string(std::string_view s) { throw std::runtime_error("on_simple_string: Has not been overridden."); } virtual void on_simple_error(std::string_view s) { throw std::runtime_error("on_simple_error: Has not been overridden."); } @@ -32,7 +32,7 @@ struct response_base { virtual void select_map(int n) { throw std::runtime_error("select_map: Has not been overridden."); } virtual void select_push(int n) { throw std::runtime_error("select_push: Has not been overridden."); } virtual void select_attribute(int n) { throw std::runtime_error("select_attribute: Has not been overridden."); } - virtual ~response_base() {} + virtual ~response_adapter_base() {} }; } // detail diff --git a/include/aedis/detail/responses.hpp b/include/aedis/detail/response_adapters.hpp similarity index 80% rename from include/aedis/detail/responses.hpp rename to include/aedis/detail/response_adapters.hpp index 0a4450a8..8264a45e 100644 --- a/include/aedis/detail/responses.hpp +++ b/include/aedis/detail/response_adapters.hpp @@ -17,12 +17,12 @@ #include #include -#include #include -#include #include +#include +#include -#include "response_base.hpp" +#include "response_adapter_base.hpp" #include @@ -41,7 +41,7 @@ inline void from_string_view(std::string_view s, std::string& r) { r = s; } -struct response_ignore : response_base { +struct response_ignore : response_adapter_base { void on_simple_string(std::string_view s) override {} void on_simple_error(std::string_view s) override {} void on_number(std::string_view s) override {} @@ -62,11 +62,11 @@ struct response_ignore : response_base { // This response type is able to deal with recursive redis responses // as in a transaction for example. -class response_tree: public response_base { +class response_tree: public response_adapter_base { public: - resp3::transaction_result* result; + resp3::transaction* result; - response_tree(resp3::transaction_result* p) : result(p) {} + response_tree(resp3::transaction* p) : result(p) {} private: int depth_ = 0; @@ -118,7 +118,7 @@ public: void pop() override { --depth_; } }; -struct response_number : public response_base { +struct response_number : public response_adapter_base { resp3::number* result = nullptr; response_number(resp3::number* p) : result(p) {} @@ -127,7 +127,7 @@ struct response_number : public response_base { { from_string_view(s, *result); } }; -struct response_blob_string : public response_base { +struct response_blob_string : public response_adapter_base { resp3::blob_string* result = nullptr; response_blob_string(resp3::blob_string* p) : result(p) {} @@ -136,7 +136,7 @@ struct response_blob_string : public response_base { { from_string_view(s, *result); } }; -struct response_blob_error : public response_base { +struct response_blob_error : public response_adapter_base { resp3::blob_error* result = nullptr; response_blob_error(resp3::blob_error* p) : result(p) {} @@ -145,7 +145,7 @@ struct response_blob_error : public response_base { { from_string_view(s, *result); } }; -struct response_simple_string : public response_base { +struct response_simple_string : public response_adapter_base { resp3::simple_string* result = nullptr; response_simple_string(resp3::simple_string* p) : result(p) {} @@ -154,7 +154,7 @@ struct response_simple_string : public response_base { { from_string_view(s, *result); } }; -struct response_simple_error : public response_base { +struct response_simple_error : public response_adapter_base { resp3::simple_error* result = nullptr; response_simple_error(resp3::simple_error* p) : result(p) {} @@ -163,7 +163,7 @@ struct response_simple_error : public response_base { { from_string_view(s, *result); } }; -struct response_big_number : public response_base { +struct response_big_number : public response_adapter_base { resp3::big_number* result = nullptr; response_big_number(resp3::big_number* p) : result(p) {} @@ -172,7 +172,7 @@ struct response_big_number : public response_base { { from_string_view(s, *result); } }; -struct response_double : public response_base { +struct response_double : public response_adapter_base { resp3::doublean* result = nullptr; response_double(resp3::doublean* p) : result(p) {} @@ -181,7 +181,7 @@ struct response_double : public response_base { { *result = s; } }; -struct response_verbatim_string : public response_base { +struct response_verbatim_string : public response_adapter_base { resp3::verbatim_string* result = nullptr; response_verbatim_string(resp3::verbatim_string* p) : result(p) {} @@ -190,7 +190,7 @@ struct response_verbatim_string : public response_base { { from_string_view(s, *result); } }; -struct response_streamed_string_part : public response_base { +struct response_streamed_string_part : public response_adapter_base { resp3::streamed_string_part* result = nullptr; response_streamed_string_part(resp3::streamed_string_part* p) : result(p) {} @@ -199,7 +199,7 @@ struct response_streamed_string_part : public response_base { { *result += s; } }; -struct response_bool : public response_base { +struct response_bool : public response_adapter_base { resp3::boolean* result = nullptr; response_bool(resp3::boolean* p) : result(p) {} @@ -216,7 +216,7 @@ template< class Compare = std::less, class Allocator = std::allocator > -struct response_unordered_set : response_base { +struct response_unordered_set : response_adapter_base { void on_blob_string(std::string_view s) override { Key r; @@ -231,7 +231,7 @@ struct response_unordered_set : response_base { }; template -struct response_basic_array : response_base { +struct response_basic_array : response_adapter_base { resp3::basic_array* result = nullptr; response_basic_array(resp3::basic_array* p) : result(p) {} @@ -260,7 +260,7 @@ struct response_basic_array : response_base { using response_array = response_basic_array; -struct response_map : response_base { +struct response_map : response_adapter_base { resp3::map* result = nullptr; response_map(resp3::map* p) : result(p) {} @@ -287,7 +287,7 @@ struct response_map : response_base { void on_blob_string(std::string_view s = {}) override { add(s); } }; -struct response_set : response_base { +struct response_set : response_adapter_base { resp3::set* result = nullptr; response_set(resp3::set* p) : result(p) {} @@ -311,7 +311,7 @@ struct response_set : response_base { }; template -struct response_static_array : response_base { +struct response_static_array : response_adapter_base { int i_ = 0; void on_blob_string(std::string_view s) override { from_string_view(s, result[i_++]); } @@ -320,7 +320,7 @@ struct response_static_array : response_base { }; template -struct response_static_string : response_base { +struct response_static_string : response_adapter_base { void add(std::string_view s) { result.assign(std::cbegin(s), std::cend(s));}; void on_blob_string(std::string_view s) override @@ -335,7 +335,7 @@ template < class T, std::size_t N > -struct response_basic_static_map : response_base { +struct response_basic_static_map : response_adapter_base { int i_ = 0; void add(std::string_view s = {}) @@ -350,5 +350,44 @@ struct response_basic_static_map : response_base { std::array result; }; +struct response_adapters { + response_tree resp_transaction; + response_array resp_array; + response_array resp_push; + response_set resp_set; + response_map resp_map; + response_array resp_attribute; + response_simple_string resp_simple_string; + response_simple_error resp_simple_error; + response_number resp_number; + response_double resp_double; + response_bool resp_boolean; + response_big_number resp_big_number; + response_blob_string resp_blob_string; + response_blob_error resp_blob_error; + response_verbatim_string resp_verbatim_string; + response_streamed_string_part resp_streamed_string_part; + response_ignore resp_ignore; + + response_adapters(buffers& buf) + : resp_transaction{&buf.transaction} + , resp_array{&buf.array} + , resp_push{&buf.push} + , resp_set{&buf.set} + , resp_map{&buf.map} + , resp_attribute{&buf.attribute} + , resp_simple_string{&buf.simple_string} + , resp_simple_error{&buf.simple_error} + , resp_number{&buf.number} + , resp_double{&buf.doublean} + , resp_boolean{&buf.boolean} + , resp_big_number{&buf.big_number} + , resp_blob_string{&buf.blob_string} + , resp_blob_error{&buf.blob_error} + , resp_verbatim_string{&buf.verbatim_string} + , resp_streamed_string_part{&buf.streamed_string_part} + { } +}; + } // detail } // aedis diff --git a/include/aedis/detail/response_buffers.hpp b/include/aedis/detail/response_buffers.hpp deleted file mode 100644 index 10505e79..00000000 --- a/include/aedis/detail/response_buffers.hpp +++ /dev/null @@ -1,77 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#pragma once - -#include -#include -#include - -#include "responses.hpp" - -namespace aedis { namespace detail { - -struct buffers { - resp3::transaction_result tree; - resp3::array array; - resp3::array push; - resp3::set set; - resp3::map map; - resp3::array attribute; - resp3::simple_string simple_string; - resp3::simple_error simple_error; - resp3::number number; - resp3::doublean doublean; - resp3::boolean boolean; - resp3::big_number big_number; - resp3::blob_string blob_string; - resp3::blob_error blob_error; - resp3::verbatim_string verbatim_string; - resp3::streamed_string_part streamed_string_part; -}; - -struct response_buffers { - response_tree resp_tree; - response_array resp_array; - response_array resp_push; - response_set resp_set; - response_map resp_map; - response_array resp_attribute; - response_simple_string resp_simple_string; - response_simple_error resp_simple_error; - response_number resp_number; - response_double resp_double; - response_bool resp_boolean; - response_big_number resp_big_number; - response_blob_string resp_blob_string; - response_blob_error resp_blob_error; - response_verbatim_string resp_verbatim_string; - response_streamed_string_part resp_streamed_string_part; - response_ignore resp_ignore; - - response_buffers(buffers& buf) - : resp_tree{&buf.tree} - , resp_array{&buf.array} - , resp_push{&buf.push} - , resp_set{&buf.set} - , resp_map{&buf.map} - , resp_attribute{&buf.attribute} - , resp_simple_string{&buf.simple_string} - , resp_simple_error{&buf.simple_error} - , resp_number{&buf.number} - , resp_double{&buf.doublean} - , resp_boolean{&buf.boolean} - , resp_big_number{&buf.big_number} - , resp_blob_string{&buf.blob_string} - , resp_blob_error{&buf.blob_error} - , resp_verbatim_string{&buf.verbatim_string} - , resp_streamed_string_part{&buf.streamed_string_part} - { } -}; - -} // detail -} // aedis diff --git a/include/aedis/impl/commands.ipp b/include/aedis/impl/command.ipp similarity index 100% rename from include/aedis/impl/commands.ipp rename to include/aedis/impl/command.ipp diff --git a/include/aedis/impl/connection.ipp b/include/aedis/impl/connection.ipp index c54e71a6..a5d20a5f 100644 --- a/include/aedis/impl/connection.ipp +++ b/include/aedis/impl/connection.ipp @@ -13,60 +13,26 @@ namespace aedis { connection::connection(net::any_io_executor const& ioc, config const& conf) : socket_{ioc} -, resps_{buffers_} , conf_{conf} { } -net::awaitable connection::worker_coro(receiver_base& recv) -{ - auto ex = co_await net::this_coro::executor; - - net::ip::tcp::resolver resolver{ex}; - auto const res = resolver.resolve(conf_.host, conf_.port); - - net::steady_timer timer {ex}; - std::chrono::seconds wait_interval {1}; - - do { - boost::system::error_code ec; - co_await async_connect( - socket_, - res, - net::redirect_error(net::use_awaitable, ec)); - - if (ec) { - socket_.close(); - timer.expires_after(wait_interval); - co_await timer.async_wait(net::use_awaitable); - continue; - } - - send([](auto& req) { req.hello(); }); - - co_await detail::async_reader(socket_, buffer_, resps_, recv, reqs_, ec); - - if (ec) { - socket_.close(); - timer.expires_after(wait_interval); - co_await timer.async_wait(net::use_awaitable); - continue; - } - } while (reconnect_); -} - -void connection::start(receiver_base& recv) +void connection::start(receiver_base& recv, buffers& bufs) { auto self = this->shared_from_this(); - net::co_spawn( - socket_.get_executor(), - [self, &recv] () mutable { return self->worker_coro(recv); }, - net::detached); -} -void connection::enable_reconnect() noexcept -{ - reconnect_ = false; + auto receiver = [&](auto cmd, auto type) + { + switch (type) { + case resp3::type::push: recv.on_push(); break; + default: detail::forward(cmd, type, recv); + } + }; + + auto f = [self, receiver, &bufs] () mutable + { return self->worker_coro(receiver, bufs); }; + + net::co_spawn(socket_.get_executor(), f, net::detached); } } // aedis diff --git a/include/aedis/impl/src.hpp b/include/aedis/impl/src.hpp index 4e6c7ac2..eaa784c9 100644 --- a/include/aedis/impl/src.hpp +++ b/include/aedis/impl/src.hpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include diff --git a/include/aedis/receiver_base.hpp b/include/aedis/receiver_base.hpp index 2f755d49..22916a00 100644 --- a/include/aedis/receiver_base.hpp +++ b/include/aedis/receiver_base.hpp @@ -15,8 +15,7 @@ namespace aedis { -#define RECEIVER_FUNCTION_REF(name, cmd) virtual void on_##cmd(resp3::name& v) noexcept { } -#define RECEIVER_FUNCTION(name, cmd) virtual void on_##cmd(resp3::name v) noexcept { } +#define RECEIVER_FUNCTION(cmd) virtual void on_##cmd(resp3::type type) noexcept { } /** Receiver base class. * @@ -30,153 +29,49 @@ namespace aedis { class receiver_base { public: - - /// Receiver of an acl_list command. - RECEIVER_FUNCTION_REF(array, acl_list); - - /// Receiver of an acl_users command. - RECEIVER_FUNCTION_REF(array, acl_users); - - /// Receiver of an acl_getuser command. - RECEIVER_FUNCTION_REF(array, acl_getuser); - - /// Receiver of an acl_cat command. - RECEIVER_FUNCTION_REF(array, acl_cat); - - /// Receiver of an acl_log command. - RECEIVER_FUNCTION_REF(array, acl_log); - - /// Receiver of an acl_help command. - RECEIVER_FUNCTION_REF(array, acl_help); - - /// Receiver of an lrange command. - RECEIVER_FUNCTION_REF(array, lrange); - - /// Receiver of an lpop command. - RECEIVER_FUNCTION_REF(array, lpop); - - /// Receiver of an hgetall command. - RECEIVER_FUNCTION_REF(array, hgetall); - - /// Receiver of an hvals command. - RECEIVER_FUNCTION_REF(array, hvals); - - /// Receiver of an zrange command. - RECEIVER_FUNCTION_REF(array, zrange); - - /// Receiver of an zrangebyscore command. - RECEIVER_FUNCTION_REF(array, zrangebyscore); - - - /// Receiver of an hello command. - RECEIVER_FUNCTION_REF(map, hello); - - - /// Receiver of an smembers command. - RECEIVER_FUNCTION_REF(set, smembers); - - - /// Receiver of an acl_load command. - RECEIVER_FUNCTION_REF(simple_string, acl_load); - - /// Receiver of an acl_save command. - RECEIVER_FUNCTION_REF(simple_string, acl_save); - - /// Receiver of an acl_setuser command. - RECEIVER_FUNCTION_REF(simple_string, acl_setuser); - - /// Receiver of an acl_log command. - RECEIVER_FUNCTION_REF(simple_string, acl_log); - - /// Receiver of an acl_ping command. - RECEIVER_FUNCTION_REF(simple_string, ping); - - /// Receiver of an quit command. - RECEIVER_FUNCTION_REF(simple_string, quit); - - /// Receiver of an flushall command. - RECEIVER_FUNCTION_REF(simple_string, flushall); - - /// Receiver of an ltrim command. - RECEIVER_FUNCTION_REF(simple_string, ltrim); - - /// Receiver of an set command. - RECEIVER_FUNCTION_REF(simple_string, set); - - - /// Receiver of an acl_deluser command. - RECEIVER_FUNCTION(number, acl_deluser); - - /// Receiver of an rpush command. - RECEIVER_FUNCTION(number, rpush); - - /// Receiver of an del command. - RECEIVER_FUNCTION(number, del); - - /// Receiver of an llen command. - RECEIVER_FUNCTION(number, llen); - - /// Receiver of an publish command. - RECEIVER_FUNCTION(number, publish); - - /// Receiver of an incr command. - RECEIVER_FUNCTION(number, incr); - - /// Receiver of an append command. - RECEIVER_FUNCTION(number, append); - - /// Receiver of an hset command. - RECEIVER_FUNCTION(number, hset); - - /// Receiver of an hincrby command. - RECEIVER_FUNCTION(number, hincrby); - - /// Receiver of an zadd command. - RECEIVER_FUNCTION(number, zadd); - - /// Receiver of an zremrangebyscore command. - RECEIVER_FUNCTION(number, zremrangebyscore); - - /// Receiver of an expire command. - RECEIVER_FUNCTION(number, expire); - - /// Receiver of an sadd command. - RECEIVER_FUNCTION(number, sadd); - - /// Receiver of an hdel command. - RECEIVER_FUNCTION(number, hdel); - - - /// Receiver of an acl_genpass command. - RECEIVER_FUNCTION_REF(blob_string, acl_genpass); - - /// Receiver of an acl_whoami command. - RECEIVER_FUNCTION_REF(blob_string, acl_whoami); - - /// Receiver of an lpop command. - RECEIVER_FUNCTION_REF(blob_string, lpop); - - /// Receiver of an get command. - RECEIVER_FUNCTION_REF(blob_string, get); - - /// Receiver of an hget command. - RECEIVER_FUNCTION_REF(blob_string, hget); - - /// Callback for push notifications - virtual void on_push(resp3::array& v) noexcept { } - - /// Callback for simple error. - virtual void on_simple_error(command cmd, resp3::simple_error& v) noexcept { } - - /// Callback for blob error. - virtual void on_blob_error(command cmd, resp3::blob_error& v) noexcept { } - - /// Callback from null responses. - virtual void on_null(command cmd) noexcept { } - - /// Receives a transaction - virtual void - on_transaction(resp3::transaction_result& result) noexcept { } + RECEIVER_FUNCTION(acl_cat); + RECEIVER_FUNCTION(acl_deluser); + RECEIVER_FUNCTION(acl_genpass); + RECEIVER_FUNCTION(acl_getuser); + RECEIVER_FUNCTION(acl_help); + RECEIVER_FUNCTION(acl_list); + RECEIVER_FUNCTION(acl_load); + RECEIVER_FUNCTION(acl_log); + RECEIVER_FUNCTION(acl_save); + RECEIVER_FUNCTION(acl_setuser); + RECEIVER_FUNCTION(acl_users); + RECEIVER_FUNCTION(acl_whoami); + RECEIVER_FUNCTION(append); + RECEIVER_FUNCTION(del); + RECEIVER_FUNCTION(exec); + RECEIVER_FUNCTION(expire); + RECEIVER_FUNCTION(flushall); + RECEIVER_FUNCTION(get); + RECEIVER_FUNCTION(hdel); + RECEIVER_FUNCTION(hello); + RECEIVER_FUNCTION(hget); + RECEIVER_FUNCTION(hgetall); + RECEIVER_FUNCTION(hincrby); + RECEIVER_FUNCTION(hset); + RECEIVER_FUNCTION(hvals); + RECEIVER_FUNCTION(incr); + RECEIVER_FUNCTION(llen); + RECEIVER_FUNCTION(lpop); + RECEIVER_FUNCTION(lrange); + RECEIVER_FUNCTION(ltrim); + RECEIVER_FUNCTION(multi); + RECEIVER_FUNCTION(ping); + RECEIVER_FUNCTION(publish); + RECEIVER_FUNCTION(quit); + RECEIVER_FUNCTION(rpush); + RECEIVER_FUNCTION(sadd); + RECEIVER_FUNCTION(set); + RECEIVER_FUNCTION(smembers); + RECEIVER_FUNCTION(zadd); + RECEIVER_FUNCTION(zrange); + RECEIVER_FUNCTION(zrangebyscore); + RECEIVER_FUNCTION(zremrangebyscore); + virtual void on_push() noexcept { } }; } // aedis diff --git a/include/aedis/type.hpp b/include/aedis/type.hpp index 76469291..ef68aade 100644 --- a/include/aedis/type.hpp +++ b/include/aedis/type.hpp @@ -32,6 +32,7 @@ enum class type , verbatim_string , blob_string , streamed_string_part +, transaction // Not from resp3. , invalid }; @@ -46,6 +47,8 @@ using basic_array = std::vector; using array = basic_array; using array_int = basic_array; +using push = std::vector; + /// RESP3 map type. using map = std::vector; @@ -71,7 +74,7 @@ struct transaction_item { std::vector value; }; -using transaction_result = std::vector; +using transaction = std::vector; } // resp3 } // aedis diff --git a/tests/general.cpp b/tests/general.cpp index b5d845df..cb7a7a2e 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -43,14 +43,15 @@ void check_equal_number(T const& a, T const& b, std::string const& msg = "") class test_receiver : public receiver_base { private: std::shared_ptr conn_; + buffers& buffers_; std::vector list_ {1 ,2, 3, 4, 5, 6}; std::string set_ {"aaa"}; public: - test_receiver(std::shared_ptr conn) : conn_{conn} { } + test_receiver(std::shared_ptr conn, buffers& bufs) : conn_{conn}, buffers_{bufs} { } - void on_hello(resp3::array& v) noexcept override + void on_hello(resp3::type type) noexcept override { auto f = [this](auto& req) { @@ -91,14 +92,10 @@ public: }; conn_->send(f); + buffers_.map.clear(); } - void on_simple_error(command cmd, resp3::simple_error& v) noexcept override - { - std::cout << v << std::endl; - } - - void on_push(resp3::array& v) noexcept override + void on_push() noexcept override { // TODO: Check the responses below. // {"subscribe", "channel", "1"} @@ -106,87 +103,132 @@ public: check_equal(1, 1, "push (receiver)"); } - // Blob string. - void on_get(resp3::blob_string& s) noexcept override - { check_equal(s, set_, "get (receiver)"); } + void on_get(resp3::type type) noexcept override + { + check_equal(buffers_.blob_string, set_, "get (receiver)"); + buffers_.blob_string.clear(); + } - void on_hget(resp3::blob_string& s) noexcept override - { check_equal(s, std::string{"value2"}, "hget (receiver)"); } + void on_hget(resp3::type type) noexcept override + { + check_equal(buffers_.blob_string, std::string{"value2"}, "hget (receiver)"); + buffers_.blob_string.clear(); + } - void on_lpop(resp3::blob_string& s) noexcept override - { check_equal(s, {"3"}, "lpop (receiver)"); } + void on_lrange(resp3::type type) noexcept override + { + check_equal(buffers_.array, {"1", "2", "3", "4", "5", "6"}, "lrange (receiver)"); + buffers_.array.clear(); + } - // Array - void on_lrange(resp3::array& v) noexcept override - { check_equal(v, {"1", "2", "3", "4", "5", "6"}, "lrange (receiver)"); } + void on_lpop(resp3::type type) noexcept override + { + if (type == resp3::type::array) { + check_equal(buffers_.array, {"4", "5"}, "lpop(count) (receiver)"); + buffers_.array.clear(); + return; + } - void on_lpop(resp3::array& s) noexcept override - { check_equal(s, {"4", "5"}, "lpop(count) (receiver)"); } + if (type == resp3::type::blob_string) { + check_equal(buffers_.blob_string, {"3"}, "lpop(count) (receiver)"); + buffers_.array.clear(); + return; + } - void on_hgetall(resp3::array& s) noexcept override - { check_equal(s, {"field1", "value1", "field2", "value2"}, "hgetall (receiver)"); } + assert(false); + } - void on_hvals(resp3::array& s) noexcept override - { check_equal(s, {"value1", "value2"}, "hvals (receiver)"); } + void on_hgetall(resp3::type type) noexcept override + { + check_equal(buffers_.map, {"field1", "value1", "field2", "value2"}, "hgetall (receiver)"); + buffers_.array.clear(); + } - void on_zrange(resp3::array& s) noexcept override - { check_equal(s, {"Marcelo"}, "zrange (receiver)"); } + void on_hvals(resp3::type type) noexcept override + { + check_equal(buffers_.array, {"value1", "value2"}, "hvals (receiver)"); + buffers_.array.clear(); + } - void on_zrangebyscore(resp3::array& s) noexcept override - { check_equal(s, {"Marcelo"}, "zrangebyscore (receiver)"); } + void on_zrange(resp3::type type) noexcept override + { + check_equal(buffers_.array, {"Marcelo"}, "zrange (receiver)"); + buffers_.array.clear(); + } - void on_smembers(resp3::set& s) noexcept override - { check_equal(s, {"1", "2", "3"}, "smembers (receiver)"); } + void on_zrangebyscore(resp3::type type) noexcept override + { + check_equal(buffers_.array, {"Marcelo"}, "zrangebyscore (receiver)"); + buffers_.array.clear(); + } + + void on_smembers(resp3::type type) noexcept override + { + check_equal(buffers_.set, {"1", "2", "3"}, "smembers (receiver)"); + buffers_.array.clear(); + } // Simple string - void on_set(resp3::simple_string& s) noexcept override - { check_equal(s, {"OK"}, "set (receiver)"); } + void on_set(resp3::type type) noexcept override + { + check_equal(buffers_.simple_string, {"OK"}, "set (receiver)"); + buffers_.simple_string.clear(); + } - void on_quit(resp3::simple_string& s) noexcept override - { check_equal(s, {"OK"}, "quit (receiver)"); } + void on_quit(resp3::type type) noexcept override + { + check_equal(buffers_.simple_string, {"OK"}, "quit (receiver)"); + buffers_.simple_string.clear(); + } - void on_flushall(resp3::simple_string& s) noexcept override - { check_equal(s, {"OK"}, "flushall (receiver)"); } + void on_flushall(resp3::type type) noexcept override + { + check_equal(buffers_.simple_string, {"OK"}, "flushall (receiver)"); + buffers_.simple_string.clear(); + } - void on_ltrim(resp3::simple_string& s) noexcept override - { check_equal(s, {"OK"}, "ltrim (receiver)"); } + void on_ltrim(resp3::type type) noexcept override + { + check_equal(buffers_.simple_string, {"OK"}, "ltrim (receiver)"); + buffers_.simple_string.clear(); + } // Number - void on_append(resp3::number n) noexcept override - { check_equal((int)n, 4, "append (receiver)"); } + void on_append(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 4, "append (receiver)"); } - void on_hset(resp3::number n) noexcept override - { check_equal((int)n, 2, "hset (receiver)"); } + void on_hset(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 2, "hset (receiver)"); } - void on_rpush(resp3::number n) noexcept override - { check_equal(n, (resp3::number)std::size(list_), "rpush (receiver)"); } + void on_rpush(resp3::type type) noexcept override + { check_equal(buffers_.number, (resp3::number)std::size(list_), "rpush (receiver)"); } - void on_del(resp3::number n) noexcept override - { check_equal((int)n, 1, "del (receiver)"); } + void on_del(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 1, "del (receiver)"); } - void on_llen(resp3::number n) noexcept override - { check_equal((int)n, 6, "llen (receiver)"); } + void on_llen(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 6, "llen (receiver)"); } - void on_incr(resp3::number n) noexcept override - { check_equal((int)n, 1, "incr (receiver)"); } + void on_incr(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 1, "incr (receiver)"); } - void on_publish(resp3::number n) noexcept override - { check_equal((int)n, 1, "publish (receiver)"); } + void on_publish(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 1, "publish (receiver)"); } - void on_hincrby(resp3::number n) noexcept override - { check_equal((int)n, 10, "hincrby (receiver)"); } + void on_hincrby(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 10, "hincrby (receiver)"); } - void on_zadd(resp3::number n) noexcept override - { check_equal((int)n, 1, "zadd (receiver)"); } + void on_zadd(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 1, "zadd (receiver)"); } - void on_zremrangebyscore(resp3::number s) noexcept override - { check_equal((int)s, 1, "zremrangebyscore (receiver)"); } + void on_zremrangebyscore(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 1, "zremrangebyscore (receiver)"); } - void on_sadd(resp3::number n) noexcept override - { check_equal((int)n, 3, "sadd (receiver)"); } + void on_sadd(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 3, "sadd (receiver)"); } - void on_hdel(resp3::number n) noexcept override - { check_equal((int)n, 2, "hdel (receiver)"); } + void on_hdel(resp3::type type) noexcept override + { check_equal((int)buffers_.number, 2, "hdel (receiver)"); } }; @@ -194,8 +236,10 @@ void test_receiver_1() { net::io_context ioc; auto conn = std::make_shared(ioc.get_executor()); - test_receiver recv{conn}; - conn->start(recv); + + buffers bufs; + test_receiver recv{conn, bufs}; + conn->start(recv, bufs); ioc.run(); } @@ -204,29 +248,32 @@ void test_receiver_1() class ping_receiver : public receiver_base { private: std::shared_ptr conn_; + buffers& buffers_; public: - ping_receiver(std::shared_ptr conn) : conn_{conn} { } + ping_receiver(std::shared_ptr conn, buffers& bufs) : conn_{conn}, buffers_{bufs} { } - void on_hello(resp3::array& v) noexcept override + void on_hello(resp3::type type) noexcept override { conn_->ping(); } - void on_ping(resp3::simple_string& s) noexcept override + void on_ping(resp3::type type) noexcept override { - check_equal(s, {"PONG"}, "ping"); + check_equal(buffers_.simple_string, {"PONG"}, "ping"); conn_->quit(); } - void on_quit(resp3::simple_string& s) noexcept override - { check_equal(s, {"OK"}, "quit"); } + void on_quit(resp3::type type) noexcept override + { check_equal(buffers_.simple_string, {"OK"}, "quit"); } }; void test_ping() { net::io_context ioc; auto conn = std::make_shared(ioc.get_executor()); - ping_receiver recv{conn}; - conn->start(recv); + + buffers bufs; + ping_receiver recv{conn, bufs}; + conn->start(recv, bufs); ioc.run(); } @@ -245,7 +292,7 @@ void trans_filler(auto& req) //req.publish("some-channel", "message1"); req.exec(); - req.ping(); + req.incr("a"); req.publish("some-channel", "message2"); }; @@ -253,11 +300,12 @@ class trans_receiver : public receiver_base { private: int counter_ = 0; std::shared_ptr conn_; + buffers& buffers_; public: - trans_receiver(std::shared_ptr conn) : conn_{conn} { } + trans_receiver(std::shared_ptr conn, buffers& bufs) : conn_{conn}, buffers_{bufs} { } - void on_hello(resp3::array& v) noexcept override + void on_hello(resp3::type type) noexcept override { auto f = [this](auto& req) { @@ -270,8 +318,9 @@ public: } } - void on_push(resp3::array& v) noexcept override + void on_push() noexcept override { + auto& v = buffers_.push; assert(std::size(v) == 3U); auto const i = counter_ % 3; @@ -309,53 +358,58 @@ public: v.clear(); } - void on_flushall(resp3::simple_string& s) noexcept override - { check_equal(s, {"OK"}, "flushall (transaction)"); } + void on_flushall(resp3::type type) noexcept override + { check_equal(buffers_.simple_string, {"OK"}, "flushall (transaction)"); } - void - on_transaction(resp3::transaction_result& result) noexcept override + void on_exec(resp3::type type) noexcept override { - check_equal(result[0].cmd, command::ping, "transaction ping (command)"); - check_equal(result[0].depth, 1, "transaction (depth)"); - check_equal(result[0].type, resp3::type::simple_string, "transaction (type)"); - check_equal(result[0].expected_size, 1, "transaction (size)"); + check_equal(buffers_.transaction[0].cmd, command::unknown, "transaction ping (command)"); + check_equal(buffers_.transaction[0].depth, 1, "transaction (depth)"); + check_equal(buffers_.transaction[0].type, resp3::type::simple_string, "transaction (type)"); + check_equal(buffers_.transaction[0].expected_size, 1, "transaction (size)"); - check_equal(result[1].cmd, command::ping, "transaction incr (command)"); - check_equal(result[1].depth, 1, "transaction (depth)"); - check_equal(result[1].type, resp3::type::simple_string, "transaction (typ)e"); - check_equal(result[1].expected_size, 1, "transaction (size)"); + check_equal(buffers_.transaction[1].cmd, command::unknown, "transaction ping (command)"); + check_equal(buffers_.transaction[1].depth, 1, "transaction (depth)"); + check_equal(buffers_.transaction[1].type, resp3::type::simple_string, "transaction (typ)e"); + check_equal(buffers_.transaction[1].expected_size, 1, "transaction (size)"); // See note above //check_equal(result[2].command, command::publish, "transaction publish (command)"); //check_equal_number(result[2].depth, 1, "transaction (depth)"); //check_equal_number(result[2].type, resp3::type::number, "transaction (type)"); //check_equal_number(result[2].expected_size, 1, "transaction (size)"); - result.clear(); + buffers_.transaction.clear(); } - void on_quit(resp3::simple_string& s) noexcept override - { check_equal(s, {"OK"}, "quit"); } + void on_quit(resp3::type type) noexcept override + { check_equal(buffers_.simple_string, {"OK"}, "quit"); } - void on_publish(resp3::number n) noexcept override + void on_publish(resp3::type type) noexcept override { - check_equal((int)n, 1, "publish (transaction)"); + check_equal((int)buffers_.number, 1, "publish (transaction)"); } - void on_ping(resp3::simple_string& s) noexcept override + void on_ping(resp3::type type) noexcept override { - check_equal(s, {"PONG"}, "ping"); + check_equal(buffers_.simple_string, {"QUEUED"}, "ping"); conn_->send([this](auto& req) { req.quit(); }); } + + void on_multi(resp3::type type) noexcept override + { + check_equal(buffers_.simple_string, {"OK"}, "multi"); + } }; void test_trans() { net::io_context ioc; connection::config cfg{"127.0.0.1", "6379", 3, 10000}; - auto conn = std::make_shared(ioc.get_executor(), cfg); - trans_receiver recv{conn}; - conn->start(recv); + + buffers bufs; + trans_receiver recv{conn, bufs}; + conn->start(recv, bufs); ioc.run(); }