From 7cffa9a07236a3ca236a6ab0fb9b9b7cbdb0a06b Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 7 Feb 2021 09:52:03 +0100 Subject: [PATCH] Improvements in example and error handling. --- Makefile | 1 + examples/async_all_hashes.cpp | 12 +-- examples/async_basic.cpp | 69 +++++++++-------- examples/async_events.cpp | 4 +- examples/async_minimum.cpp | 38 ++++++++++ examples/async_pubsub.cpp | 2 +- examples/async_reconnect.cpp | 4 +- examples/sync_basic.cpp | 6 +- include/aedis/aedis.hpp | 2 +- include/aedis/connection.hpp | 60 +++++++++------ include/aedis/read.hpp | 75 +++++++++++++------ .../aedis/{receiver.hpp => receiver_base.hpp} | 6 ++ include/aedis/request.hpp | 4 +- include/aedis/response.hpp | 1 + include/aedis/utils.hpp | 3 +- include/aedis/write.hpp | 12 +-- tests/general.cpp | 8 +- 17 files changed, 207 insertions(+), 100 deletions(-) create mode 100644 examples/async_minimum.cpp rename include/aedis/{receiver.hpp => receiver_base.hpp} (93%) diff --git a/Makefile b/Makefile index e69daa59..ef849bbe 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,7 @@ LDFLAGS += -pthread examples = examples += sync_basic +examples += async_minimum examples += async_basic examples += async_reconnect examples += async_all_hashes diff --git a/examples/async_all_hashes.cpp b/examples/async_all_hashes.cpp index c81def9c..cf46465c 100644 --- a/examples/async_all_hashes.cpp +++ b/examples/async_all_hashes.cpp @@ -17,6 +17,8 @@ namespace this_coro = net::this_coro; using namespace net; using namespace aedis; +enum class events {ignore}; + struct foo { std::string id {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; std::string from {"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}; @@ -48,7 +50,7 @@ auto make_hset_arg(foo const& p) net::awaitable create_hashes() { std::vector posts(20000); - resp::request req; + resp::request req; req.flushall(); for (auto i = 0; i < std::ssize(posts); ++i) { std::string const name = "posts:" + std::to_string(i); @@ -71,7 +73,7 @@ net::awaitable create_hashes() net::awaitable read_hashes_coro() { - resp::request req; + resp::request req; req.keys("posts:*"); auto ex = co_await this_coro::executor; @@ -88,7 +90,7 @@ net::awaitable read_hashes_coro() //print(keys.result); // Generates the request to retrieve all hashes. - resp::request pv; + resp::request pv; for (auto const& o : keys.result) pv.hvals(o); pv.quit(); @@ -107,7 +109,7 @@ net::awaitable read_hashes_coro() void read_hashes(net::io_context& ioc) { - resp::request req; + resp::request req; req.keys("posts:*"); tcp::resolver resv(ioc); @@ -122,7 +124,7 @@ void read_hashes(net::io_context& ioc) resp::read(socket, buffer, keys); // Generates the request to retrieve all hashes. - resp::request pv; + resp::request pv; for (auto const& o : keys.result) pv.hvals(o); pv.quit(); diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 1f3fc513..55ebd9e2 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -8,54 +8,61 @@ #include #include -namespace net = aedis::net; using namespace aedis; -using tcp = net::ip::tcp; -enum class myevents -{ one -, two -, three -, ignore -}; +/* This example shows how to receive and send events. + * + * 1. Store a shared_ptr to the connection in the receiver. + * + * 2. Start sending commands after the hello command has been + * received. + * + * As a rule, every redis command is received in a function named + * on_command. The user has to override the base class version to + * start receiving events. + */ -void fill(resp::request& req) -{ - req.ping(myevents::one); - req.rpush("list", {1, 2, 3}); - req.lrange("list"); - req.ping(myevents::two); -} +enum class events {one, two, three, ignore}; -class myreceiver : public receiver_base { +class receiver : public receiver_base { private: - std::shared_ptr> conn_; + std::shared_ptr> conn_; public: - using event_type = myevents; + using event_type = events; + receiver(std::shared_ptr> conn) : conn_{conn} { } - myreceiver(std::shared_ptr> conn) - : conn_{conn} - { } - - void on_hello(myevents ev, resp::response_array::data_type& v) noexcept override + void on_hello(events ev, resp::response_array::data_type& v) noexcept override { - resp::print(v); - conn_->send(fill); + print(v, "HELLO"); + + auto f = [](auto& req) + { + req.ping(events::one); + req.quit(); + }; + + conn_->disable_reconnect(); + conn_->send(f); } + + void on_ping(events ev, resp::response_simple_string::data_type& s) noexcept override + { std::cout << "PING: " << s << std::endl; } + + void on_quit(events ev, resp::response_simple_string::data_type& s) noexcept override + { std::cout << "QUIT: " << s << std::endl; } }; int main() { net::io_context ioc {1}; - - tcp::resolver resolver{ioc}; + net::ip::tcp::resolver resolver{ioc}; auto const results = resolver.resolve("127.0.0.1", "6379"); - - auto conn = std::make_shared>(ioc); - myreceiver recv{conn}; - + auto conn = std::make_shared>(ioc); + receiver recv{conn}; conn->start(recv, results); ioc.run(); } + + diff --git a/examples/async_events.cpp b/examples/async_events.cpp index 27fac27e..aaf6dd94 100644 --- a/examples/async_events.cpp +++ b/examples/async_events.cpp @@ -43,13 +43,13 @@ net::awaitable example() { resp::response_basic_array res; co_await resp::async_read(socket, buffer, res); - resp::print(res.result, "Interesting1"); + print(res.result, "Interesting1"); } break; case myevents::interesting2: { resp::response_set res; co_await resp::async_read(socket, buffer, res); - resp::print(res.result, "Interesting2"); + print(res.result, "Interesting2"); } break; default: { diff --git a/examples/async_minimum.cpp b/examples/async_minimum.cpp new file mode 100644 index 00000000..38f9bdb6 --- /dev/null +++ b/examples/async_minimum.cpp @@ -0,0 +1,38 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include +#include + +using namespace aedis; + +/* This example shows the absolute minimum you need to stablish a + * connection with redis. + * + * 1. Write an enum class that defines your events. + * + * 2. Write a receiver. The receiver_base class below is not + * required if your receiver class supports the receiver + * concept. + * + * In the next examples we will see how to receive and write commands. + */ + +enum class events {one, two, three, ignore}; +struct receiver : public receiver_base { }; + +int main() +{ + net::io_context ioc {1}; + net::ip::tcp::resolver resolver{ioc}; + auto const results = resolver.resolve("127.0.0.1", "6379"); + auto conn = std::make_shared>(ioc); + receiver recv; + conn->start(recv, results); + ioc.run(); +} + diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp index 75713594..ae8e7a56 100644 --- a/examples/async_pubsub.cpp +++ b/examples/async_pubsub.cpp @@ -40,7 +40,7 @@ public: void on_hello(myevents ev, resp::response_array::data_type& v) noexcept override { - resp::print(v); + print(v); conn_->send(fill); } }; diff --git a/examples/async_reconnect.cpp b/examples/async_reconnect.cpp index b65074c0..1d4821d3 100644 --- a/examples/async_reconnect.cpp +++ b/examples/async_reconnect.cpp @@ -15,12 +15,14 @@ using tcp = net::ip::tcp; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using stimer = net::use_awaitable_t<>::as_default_on_t; +enum class events {ignore}; + net::awaitable example1() { auto ex = co_await this_coro::executor; for (;;) { try { - resp::request req; + resp::request req; req.quit(); tcp::resolver resv(ex); diff --git a/examples/sync_basic.cpp b/examples/sync_basic.cpp index 3fbc0c45..574f53b3 100644 --- a/examples/sync_basic.cpp +++ b/examples/sync_basic.cpp @@ -10,10 +10,12 @@ using namespace aedis; +enum class events {ignore}; + int main() { try { - resp::request req; + resp::request req; req.hello(); req.set("Password", {"12345"}); req.get("Password"); @@ -28,7 +30,7 @@ int main() std::string buffer; resp::response_flat_map hello; resp::read(socket, buffer, hello); - resp::print(hello.result); + print(hello.result); resp::response_simple_string set; resp::read(socket, buffer, set); diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index be93d50f..1e2de279 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -13,4 +13,4 @@ #include #include #include -#include +#include diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index 6c705f62..249378df 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -8,10 +8,12 @@ #pragma once #include +#include #include "config.hpp" #include "type.hpp" #include "request.hpp" +#include "response.hpp" namespace aedis { @@ -21,9 +23,12 @@ class connection : private: net::steady_timer timer_; net::ip::tcp::socket socket_; + std::string buffer_; + resp::response_buffers resps_; std::queue> reqs_; + bool reconnect_ = true; - void finish() + void finish_coro() { socket_.close(); timer_.cancel(); @@ -37,31 +42,39 @@ private: { auto ex = co_await net::this_coro::executor; + net::steady_timer timer {ex}; + std::chrono::seconds wait_interval {1}; + boost::system::error_code ec; + while (reconnect_) { + co_await async_connect( + socket_, + results, + net::redirect_error(net::use_awaitable, ec)); - co_await async_connect( - socket_, - results, - net::redirect_error(net::use_awaitable, ec)); + if (ec) { + finish_coro(); + recv.on_error(ec); + timer.expires_after(wait_interval); + co_await timer.async_wait(net::use_awaitable); + continue; + } - if (ec) { - finish(); - recv.on_error(ec); - co_return; - } + async_writer(socket_, reqs_, timer_, net::detached); - resp::async_writer(socket_, reqs_, timer_, net::detached); + ec = {}; + co_await co_spawn( + ex, + async_reader(socket_, buffer_, resps_, recv, reqs_, ec), + net::use_awaitable); - ec = {}; - co_await co_spawn( - ex, - resp::async_reader(socket_, recv, reqs_), - net::redirect_error(net::use_awaitable, ec)); - - if (ec) { - finish(); - recv.on_error(ec); - co_return; + if (ec) { + finish_coro(); + recv.on_error(ec); + timer.expires_after(wait_interval); + co_await timer.async_wait(net::use_awaitable); + continue; + } } } @@ -92,6 +105,11 @@ public: template void send(Filler filler) { queue_writer(reqs_, filler, timer_); } + + void disable_reconnect() + { + reconnect_ = false; + } }; } // aedis diff --git a/include/aedis/read.hpp b/include/aedis/read.hpp index 87de3230..ec76a11a 100644 --- a/include/aedis/read.hpp +++ b/include/aedis/read.hpp @@ -24,8 +24,8 @@ #include "config.hpp" #include "type.hpp" #include "parser.hpp" -#include "response.hpp" #include "request.hpp" +#include "response.hpp" namespace aedis { namespace resp { @@ -204,16 +204,20 @@ public: , boost::system::error_code ec = {} , std::size_t n = 0) { + if (ec) + return self.complete(ec); + if (std::empty(*buf_)) { net::async_read_until( stream_, net::dynamic_buffer(*buf_), "\r\n", std::move(self)); - } else { - *t_ = to_type(buf_->front()); - return self.complete(ec); + return; } + + *t_ = to_type(buf_->front()); + return self.complete(ec); } }; @@ -238,38 +242,56 @@ auto async_read_type( stream); } +} // resp + template < class AsyncReadWriteStream, - class Receiver> + class Storage, + class Receiver, + class ResponseBuffers> net::awaitable async_reader( AsyncReadWriteStream& socket, + Storage& buffer, + ResponseBuffers& resps, Receiver& recv, - std::queue>& reqs) + std::queue>& reqs, + boost::system::error_code& ec) { using event_type = typename Receiver::event_type; - using response_id_type = response_id; - - std::string buffer; - resp::response_buffers resps; + using response_id_type = resp::response_id; // Used to queue the events of a transaction. std::queue trans; for (;;) { - type t; - co_await async_read_type(socket, buffer, t, net::use_awaitable); + resp::type t; + co_await async_read_type( + socket, + buffer, + t, + net::redirect_error(net::use_awaitable, ec)); + + if (ec) + co_return; + auto& req = reqs.front(); - auto const cmd = t == type::push ? command::none : req.events.front().first; + auto const cmd = t == resp::type::push ? command::none : req.events.front().first; auto const is_multi = cmd == command::multi; auto const is_exec = cmd == command::exec; auto const trans_empty = std::empty(trans); - // The next two ifs are used to deal with transactions. if (is_multi || (!trans_empty && !is_exec)) { - response_static_string<6> tmp; - co_await async_read(socket, buffer, tmp, net::use_awaitable); + resp::response_static_string<6> tmp; + co_await async_read( + socket, + buffer, + tmp, + net::redirect_error(net::use_awaitable, ec)); + + if (ec) + co_return; // Failing to QUEUE a command inside a trasaction is // considered an application error. The multi commands @@ -280,7 +302,7 @@ async_reader( // Pushes the command in the transction command queue that will be // processed when exec arrives. - trans.push({req.events.front().first, type::invalid, req.events.front().second}); + trans.push({req.events.front().first, resp::type::invalid, req.events.front().second}); req.events.pop(); continue; } @@ -301,7 +323,10 @@ async_reader( socket, buffer, *tmp, - net::use_awaitable); + net::redirect_error(net::use_awaitable, ec)); + + if (ec) + co_return; trans.pop(); // Removes multi. resps.forward_transaction(std::move(trans), recv); @@ -314,7 +339,10 @@ async_reader( co_await async_write( socket, reqs.front(), - net::use_awaitable); + net::redirect_error(net::use_awaitable, ec)); + + if (ec) + co_return; } } continue; @@ -322,15 +350,19 @@ async_reader( response_id_type id{cmd, t, req.events.front().second}; auto* tmp = resps.get(id); + co_await async_read( socket, buffer, *tmp, - net::use_awaitable); + net::redirect_error(net::use_awaitable, ec)); + + if (ec) + co_return; resps.forward(id, recv); - if (t != type::push) + if (t != resp::type::push) req.events.pop(); if (std::empty(req.events)) { @@ -345,5 +377,4 @@ async_reader( } } -} // resp } // aedis diff --git a/include/aedis/receiver.hpp b/include/aedis/receiver_base.hpp similarity index 93% rename from include/aedis/receiver.hpp rename to include/aedis/receiver_base.hpp index 3159cb01..48c6bfaf 100644 --- a/include/aedis/receiver.hpp +++ b/include/aedis/receiver_base.hpp @@ -22,9 +22,15 @@ class receiver_base { public: using event_type = Event; + // Array virtual void on_lrange(Event ev, resp::response_array::data_type& v) noexcept { } virtual void on_hello(Event ev, resp::response_array::data_type& v) noexcept {} + + // Simple string virtual void on_ping(Event ev, resp::response_simple_string::data_type& v) noexcept { } + virtual void on_quit(Event ev, resp::response_simple_string::data_type& v) noexcept { } + + // Number virtual void on_rpush(Event ev, resp::response_number::data_type& v) noexcept { } virtual void on_push(command cmd, Event ev, resp::response_array::data_type& v) noexcept { } diff --git a/include/aedis/request.hpp b/include/aedis/request.hpp index d8f62d45..646b6860 100644 --- a/include/aedis/request.hpp +++ b/include/aedis/request.hpp @@ -131,11 +131,9 @@ void assemble(std::string& ret, std::string_view cmd, std::string_view key) assemble(ret, cmd, {key}, std::cbegin(dummy), std::cend(dummy)); } -enum class event {ignore}; - // TODO: Make the write functions friend of this class and make the // payload private. -template +template class request { public: std::string payload; diff --git a/include/aedis/response.hpp b/include/aedis/response.hpp index 2d961bb4..2aad707f 100644 --- a/include/aedis/response.hpp +++ b/include/aedis/response.hpp @@ -593,6 +593,7 @@ public: { switch (id.cmd) { case command::ping: recv.on_ping(id.event, simple_string_.result); break; + case command::quit: recv.on_quit(id.event, simple_string_.result); break; default: {assert(false);} } simple_string_.result.clear(); diff --git a/include/aedis/utils.hpp b/include/aedis/utils.hpp index 3439e127..a67fa17f 100644 --- a/include/aedis/utils.hpp +++ b/include/aedis/utils.hpp @@ -9,7 +9,7 @@ #include -namespace aedis { namespace resp { +namespace aedis { template void print(Iter begin, Iter end, char const* p) @@ -45,5 +45,4 @@ void print_command_raw(std::string const& data, int n) } } -} // resp } // aedis diff --git a/include/aedis/write.hpp b/include/aedis/write.hpp index 2fd08059..b3464c48 100644 --- a/include/aedis/write.hpp +++ b/include/aedis/write.hpp @@ -27,7 +27,7 @@ write( boost::system::error_code& ec) { static_assert(boost::beast::is_sync_write_stream::value, - "SyncWriteStream type requirements not met"); + "SyncWriteStream type requirements not met"); return write(stream, net::buffer(req.payload), ec); } @@ -71,13 +71,15 @@ async_write( return net::async_write(stream, net::buffer(req.payload), token); } +} // resp + template < class AsyncReadStream, class Event> struct writer_op { AsyncReadStream& stream; net::steady_timer& st; - std::queue>* reqs; + std::queue>* reqs; template void operator()( @@ -127,7 +129,7 @@ template < > auto async_writer( AsyncWriteStream& stream, - std::queue>& reqs, + std::queue>& reqs, net::steady_timer& writeTrigger, CompletionToken&& token = net::default_completion_token_t{}) @@ -141,7 +143,7 @@ auto async_writer( writeTrigger); } -// Returns true id a write has been triggered. +// Returns true if a write has been triggered. template bool queue_writer( std::queue>& reqs, @@ -159,7 +161,5 @@ bool queue_writer( return empty; } - -} // resp } // aedis diff --git a/tests/general.cpp b/tests/general.cpp index 87f61406..34005385 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -21,6 +21,8 @@ namespace this_coro = net::this_coro; using namespace aedis; +enum class events {ignore}; + template void check_equal(T const& a, T const& b, std::string const& msg = "") { @@ -34,7 +36,7 @@ net::awaitable test_list() { std::vector list {1 ,2, 3, 4, 5, 6}; - resp::request p; + resp::request p; p.hello("3"); p.flushall(); p.rpush("a", list); @@ -115,7 +117,7 @@ net::awaitable test_set() tcp_socket socket {ex}; co_await async_connect(socket, rr); - resp::request p; + resp::request p; p.hello("3"); p.flushall(); p.set("s", {test_bulk1});