From 63712da04a6ac1136da6205ac48bb35bd57d2b19 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 6 Feb 2021 20:20:16 +0100 Subject: [PATCH] Changes in the connection class and the receiver. --- examples/async_basic.cpp | 48 +++++++++---- examples/async_pubsub.cpp | 49 +++++++++---- examples/sync_basic.cpp | 7 +- include/aedis/aedis.hpp | 12 +--- include/aedis/command.hpp | 21 ++++-- include/aedis/config.hpp | 15 ++++ include/aedis/connection.hpp | 98 ++++++++++++++++++++++++++ include/aedis/read.hpp | 11 +-- include/aedis/receiver.hpp | 44 ++++++++++++ include/aedis/receiver_print.hpp | 114 ------------------------------- include/aedis/request.hpp | 4 +- include/aedis/response.hpp | 41 +++++++---- include/aedis/response_types.hpp | 26 +++++++ include/aedis/type.hpp | 9 +++ include/aedis/write.hpp | 6 +- 15 files changed, 318 insertions(+), 187 deletions(-) create mode 100644 include/aedis/config.hpp create mode 100644 include/aedis/connection.hpp create mode 100644 include/aedis/receiver.hpp delete mode 100644 include/aedis/receiver_print.hpp create mode 100644 include/aedis/response_types.hpp diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 7ff5f1a9..1f3fc513 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -7,31 +7,55 @@ #include #include -#include - -#include namespace net = aedis::net; using namespace aedis; using tcp = net::ip::tcp; -void fill1(resp::request& req) +enum class myevents +{ one +, two +, three +, ignore +}; + +void fill(resp::request& req) { - req.ping(); + req.ping(myevents::one); req.rpush("list", {1, 2, 3}); - req.multi(); req.lrange("list"); - req.exec(); - req.ping(); + req.ping(myevents::two); } +class myreceiver : public receiver_base { +private: + std::shared_ptr> conn_; + +public: + using event_type = myevents; + + myreceiver(std::shared_ptr> conn) + : conn_{conn} + { } + + void on_hello(myevents ev, resp::response_array::data_type& v) noexcept override + { + resp::print(v); + conn_->send(fill); + } +}; + int main() { net::io_context ioc {1}; - auto conn = std::make_shared>(ioc); - resp::receiver_base recv; - conn->start(recv); - conn->send(fill1); + + tcp::resolver resolver{ioc}; + auto const results = resolver.resolve("127.0.0.1", "6379"); + + auto conn = std::make_shared>(ioc); + myreceiver recv{conn}; + + conn->start(recv, results); ioc.run(); } diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp index 1c820143..75713594 100644 --- a/examples/async_pubsub.cpp +++ b/examples/async_pubsub.cpp @@ -7,33 +7,56 @@ #include #include -#include - -#include namespace net = aedis::net; using namespace aedis; using tcp = net::ip::tcp; -enum class myevent {zero, one, two, ignore}; +enum class myevents +{ one +, two +, three +, ignore +}; -void fill1(resp::request& req) +void fill(resp::request& req) { - req.ping(); + req.ping(myevents::one); req.rpush("list", {1, 2, 3}); - req.multi(); req.lrange("list"); - req.exec(); - req.ping(); + req.ping(myevents::two); } +class myreceiver : public receiver_base { +private: + std::shared_ptr> conn_; + +public: + using event_type = myevents; + + myreceiver(std::shared_ptr> conn) + : conn_{conn} + { } + + void on_hello(myevents ev, resp::response_array::data_type& v) noexcept override + { + resp::print(v); + conn_->send(fill); + } +}; + int main() { net::io_context ioc {1}; - auto conn = std::make_shared>(ioc); - resp::receiver_base recv; - conn->start(recv); - conn->send(fill1); + + tcp::resolver resolver{ioc}; + auto const results = resolver.resolve("127.0.0.1", "6379"); + + auto conn = std::make_shared>(ioc); + myreceiver recv{conn}; + + conn->start(recv, results); ioc.run(); } + diff --git a/examples/sync_basic.cpp b/examples/sync_basic.cpp index 59752053..3fbc0c45 100644 --- a/examples/sync_basic.cpp +++ b/examples/sync_basic.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -8,7 +8,6 @@ #include #include -namespace net = aedis::net; using namespace aedis; int main() @@ -21,8 +20,8 @@ int main() req.quit(); net::io_context ioc {1}; - tcp::resolver resv(ioc); - tcp::socket socket {ioc}; + net::ip::tcp::resolver resv(ioc); + net::ip::tcp::socket socket {ioc}; net::connect(socket, resv.resolve("127.0.0.1", "6379")); resp::write(socket, req); diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index 177742d6..be93d50f 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -7,16 +7,10 @@ #pragma once -#include - -namespace aedis { -namespace net = boost::asio; -namespace ip = net::ip; -using tcp = ip::tcp; -} - #include #include #include #include #include +#include +#include diff --git a/include/aedis/command.hpp b/include/aedis/command.hpp index 4d187419..0f051644 100644 --- a/include/aedis/command.hpp +++ b/include/aedis/command.hpp @@ -7,7 +7,9 @@ #pragma once -namespace aedis { namespace resp { +#include + +namespace aedis { enum class command { append @@ -20,7 +22,7 @@ enum class command , exec , expire , flushall -, get // 10 +, get , hello , hget , hgetall @@ -30,7 +32,7 @@ enum class command , hmget , hset , hvals -, incr // 20 +, incr , keys , llen , lpop @@ -40,7 +42,7 @@ enum class command , multi , ping , psubscribe -, publish // 30 +, publish , quit , role , rpush @@ -50,7 +52,7 @@ enum class command , set , smembers , subscribe -, unsubscribe // 40 +, unsubscribe , zadd , zrange , zrangebyscore @@ -114,6 +116,11 @@ auto const* to_string(command c) } } -} -} +} // aedis +inline +std::ostream& operator<<(std::ostream& os, aedis::command c) +{ + os << to_string(c); + return os; +} diff --git a/include/aedis/config.hpp b/include/aedis/config.hpp new file mode 100644 index 00000000..cb581580 --- /dev/null +++ b/include/aedis/config.hpp @@ -0,0 +1,15 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +namespace aedis { +namespace net = boost::asio; +} + diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp new file mode 100644 index 00000000..6c705f62 --- /dev/null +++ b/include/aedis/connection.hpp @@ -0,0 +1,98 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +#include "config.hpp" +#include "type.hpp" +#include "request.hpp" + +namespace aedis { + +template +class connection : + public std::enable_shared_from_this> { +private: + net::steady_timer timer_; + net::ip::tcp::socket socket_; + std::queue> reqs_; + + void finish() + { + socket_.close(); + timer_.cancel(); + } + + template + net::awaitable + worker_coro( + Receiver& recv, + typename boost::asio::ip::tcp::resolver::results_type const& results) + { + auto ex = co_await net::this_coro::executor; + + boost::system::error_code ec; + + co_await async_connect( + socket_, + results, + net::redirect_error(net::use_awaitable, ec)); + + if (ec) { + finish(); + recv.on_error(ec); + co_return; + } + + resp::async_writer(socket_, reqs_, timer_, net::detached); + + ec = {}; + co_await co_spawn( + ex, + resp::async_reader(socket_, recv, reqs_), + net::redirect_error(net::use_awaitable, ec)); + + if (ec) { + finish(); + recv.on_error(ec); + co_return; + } + } + +public: + using event_type = Event; + + connection(net::io_context& ioc) + : timer_{ioc} + , socket_{ioc} + { + reqs_.push({}); + reqs_.front().hello(); + } + + template + void + start( + Receiver& recv, + typename boost::asio::ip::tcp::resolver::results_type const& results) + { + auto self = this->shared_from_this(); + net::co_spawn( + socket_.get_executor(), + [self, &recv, &results] () mutable { return self->worker_coro(recv, results); }, + net::detached); + } + + template + void send(Filler filler) + { queue_writer(reqs_, filler, timer_); } +}; + +} // aedis + diff --git a/include/aedis/read.hpp b/include/aedis/read.hpp index 275b9991..87de3230 100644 --- a/include/aedis/read.hpp +++ b/include/aedis/read.hpp @@ -21,6 +21,7 @@ #include #include +#include "config.hpp" #include "type.hpp" #include "parser.hpp" #include "response.hpp" @@ -344,15 +345,5 @@ async_reader( } } -template -std::queue> -make_request_queue() -{ - std::queue> reqs; - reqs.push({}); - reqs.front().hello(); - return reqs; -} - } // resp } // aedis diff --git a/include/aedis/receiver.hpp b/include/aedis/receiver.hpp new file mode 100644 index 00000000..3159cb01 --- /dev/null +++ b/include/aedis/receiver.hpp @@ -0,0 +1,44 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include +#include + +#include "type.hpp" +#include "utils.hpp" +#include "response.hpp" +#include "request.hpp" + +namespace aedis { + +template +class receiver_base { +public: + using event_type = Event; + + virtual void on_lrange(Event ev, resp::response_array::data_type& v) noexcept { } + virtual void on_hello(Event ev, resp::response_array::data_type& v) noexcept {} + virtual void on_ping(Event ev, resp::response_simple_string::data_type& v) noexcept { } + virtual void on_rpush(Event ev, resp::response_number::data_type& v) noexcept { } + + virtual void on_push(command cmd, Event ev, resp::response_array::data_type& v) noexcept { } + virtual void on_set(command cmd, Event ev, resp::response_array::data_type& v) noexcept { } + virtual void on_attribute(command cmd, Event ev, resp::response_array::data_type& v) noexcept { } + virtual void on_simple_error(command cmd, Event ev, resp::response_simple_error::data_type& v) noexcept { } + virtual void on_double(command cmd, Event ev, resp::response_double::data_type& v) noexcept { } + virtual void on_big_number(command cmd, Event ev, resp::response_big_number::data_type& v) noexcept { } + virtual void on_boolean(command cmd, Event ev, resp::response_bool::data_type& v) noexcept { } + virtual void on_blob_string(command cmd, Event ev, resp::response_blob_string::data_type& v) noexcept { } + virtual void on_blob_error(command cmd, Event ev, resp::response_blob_error::data_type& v) noexcept { } + virtual void on_verbatim_string(command cmd, Event ev, resp::response_verbatim_string::data_type& v) noexcept { } + virtual void on_streamed_string_part(command cmd, Event ev, resp::response_streamed_string_part::data_type& v) noexcept { } + virtual void on_error(boost::system::error_code ec) { } +}; + +} // aedis diff --git a/include/aedis/receiver_print.hpp b/include/aedis/receiver_print.hpp deleted file mode 100644 index c73d16f1..00000000 --- a/include/aedis/receiver_print.hpp +++ /dev/null @@ -1,114 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#pragma once - -#include -#include - -#include "type.hpp" -#include "utils.hpp" -#include "response.hpp" -#include "request.hpp" - -namespace aedis { namespace resp { - -template -class receiver_base { -public: - using event_type = Event; - - virtual void on_array(command cmd, Event ev, response_array::data_type& v) noexcept - { print(v); } - virtual void on_push(command cmd, Event ev, response_array::data_type& v) noexcept - { print(v); } - virtual void on_map(command cmd, Event ev, response_array::data_type& v) noexcept - { print(v); } - virtual void on_set(command cmd, Event ev, response_array::data_type& v) noexcept - { print(v); } - virtual void on_attribute(command cmd, Event ev, response_array::data_type& v) noexcept - { print(v); } - virtual void on_simple_string(command cmd, Event ev, response_simple_string::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_simple_error(command cmd, Event ev, response_simple_error::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_number(command cmd, Event ev, response_number::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_double(command cmd, Event ev, response_double::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_big_number(command cmd, Event ev, response_big_number::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_boolean(command cmd, Event ev, response_bool::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_blob_string(command cmd, Event ev, response_blob_string::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_blob_error(command cmd, Event ev, response_blob_error::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_verbatim_string(command cmd, Event ev, response_verbatim_string::data_type& v) noexcept - { std::cout << v << std::endl; } - virtual void on_streamed_string_part(command cmd, Event ev, response_streamed_string_part::data_type& v) noexcept - { std::cout << v << std::endl; } -}; - -template -class connection : - public std::enable_shared_from_this> { -private: - net::steady_timer st_; - tcp::resolver resv_; - tcp::socket socket_; - std::queue> reqs_; - - template - net::awaitable - reconnect_loop(Receiver& recv) - { - try { - auto ex = co_await net::this_coro::executor; - auto const r = resv_.resolve("127.0.0.1", "6379"); - co_await async_connect(socket_, r, net::use_awaitable); - resp::async_writer(socket_, reqs_, st_, net::detached); - co_await co_spawn( - ex, - resp::async_reader(socket_, recv, reqs_), - net::use_awaitable); - } catch (std::exception const& e) { - std::cout << e.what() << std::endl; - socket_.close(); - st_.cancel(); - } - } - -public: - using event_type = Event; - - connection(net::io_context& ioc) - : st_{ioc} - , resv_{ioc} - , socket_{ioc} - , reqs_ (resp::make_request_queue()) - { } - - template - void start(Receiver& recv) - { - net::co_spawn( - socket_.get_executor(), - [self = this->shared_from_this(), recv] () mutable { return self->reconnect_loop(recv); }, - net::detached); - } - - template - void send(Filler filler) - { queue_writer(reqs_, filler, st_); } - - auto& requests() {return reqs_;} - auto const& requests() const {return reqs_;} -}; - -} -} diff --git a/include/aedis/request.hpp b/include/aedis/request.hpp index a8542ca0..d8f62d45 100644 --- a/include/aedis/request.hpp +++ b/include/aedis/request.hpp @@ -15,9 +15,7 @@ #include #include -#include -#include - +#include "config.hpp" #include "command.hpp" namespace aedis { namespace resp { diff --git a/include/aedis/response.hpp b/include/aedis/response.hpp index 450381bb..2d961bb4 100644 --- a/include/aedis/response.hpp +++ b/include/aedis/response.hpp @@ -19,11 +19,12 @@ #include #include -#include - +#include "config.hpp" #include "type.hpp" #include "command.hpp" +#include + namespace aedis { namespace resp { template @@ -565,28 +566,44 @@ public: set_.result.clear(); break; case type::map: - recv.on_map(id.cmd, id.event, map_.result); + { + switch (id.cmd) { + case command::hello: recv.on_hello(id.event, map_.result); break; + default: {assert(false);} + } map_.result.clear(); - break; + } break; case type::attribute: recv.on_attribute(id.cmd, id.event, attribute_.result); attribute_.result.clear(); break; case type::array: - recv.on_array(id.cmd, id.event, array_.result); + { + switch (id.cmd) { + case command::lrange: recv.on_lrange(id.event, array_.result); break; + default: {assert(false);} + } array_.result.clear(); - break; + } break; case type::simple_error: recv.on_simple_error(id.cmd, id.event, simple_error_.result); simple_error_.result.clear(); break; case type::simple_string: - recv.on_simple_string(id.cmd, id.event, simple_string_.result); + { + switch (id.cmd) { + case command::ping: recv.on_ping(id.event, simple_string_.result); break; + default: {assert(false);} + } simple_string_.result.clear(); - break; + } break; case type::number: - recv.on_number(id.cmd, id.event, number_.result); - break; + { + switch (id.cmd) { + case command::rpush: recv.on_rpush(id.event, number_.result); break; + default: {assert(false);} + } + } break; case type::double_type: recv.on_double(id.cmd, id.event, double_.result); break; @@ -627,8 +644,8 @@ std::ostream& operator<<(std::ostream& os, aedis::resp::response_id const& id) { os - << std::left << std::setw(15) << aedis::resp::to_string(id.cmd) - << std::left << std::setw(20) << aedis::resp::to_string(id.t) + << std::left << std::setw(15) << id.cmd + << std::left << std::setw(20) << id.t << std::left << std::setw(4) << (int)id.event ; return os; diff --git a/include/aedis/response_types.hpp b/include/aedis/response_types.hpp new file mode 100644 index 00000000..50e399c0 --- /dev/null +++ b/include/aedis/response_types.hpp @@ -0,0 +1,26 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace aedis { namespace resp { + +} // resp +} // aedis + diff --git a/include/aedis/type.hpp b/include/aedis/type.hpp index ac648043..a2f87b07 100644 --- a/include/aedis/type.hpp +++ b/include/aedis/type.hpp @@ -7,6 +7,8 @@ #pragma once +#include + namespace aedis { namespace resp { enum class type @@ -82,3 +84,10 @@ auto to_type(char c) } // resp } // aedis + +inline +std::ostream& operator<<(std::ostream& os, aedis::resp::type t) +{ + os << to_string(t); + return os; +} diff --git a/include/aedis/write.hpp b/include/aedis/write.hpp index 9a2e44d3..2fd08059 100644 --- a/include/aedis/write.hpp +++ b/include/aedis/write.hpp @@ -10,10 +10,10 @@ #include #include -#include -#include +#include "config.hpp" +#include "request.hpp" -#include +#include namespace aedis { namespace resp {