diff --git a/Makefile b/Makefile index ef849bbe..63443e90 100644 --- a/Makefile +++ b/Makefile @@ -25,10 +25,8 @@ examples = examples += sync_basic examples += async_minimum examples += async_basic -examples += async_reconnect examples += async_all_hashes -examples += async_events -examples += async_pubsub +examples += async_low_level tests = tests += general diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 9141c79e..9a0bf3bb 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -5,17 +5,16 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -#include #include using namespace aedis; /* This example shows how to receive and send events. * - * 1. Store a shared_ptr to the connection in the receiver. + * 1. Create a connection obeject. * - * 2. Start sending commands after the hello command has been - * received. + * 2. Start sending commands after the hello command has been + * received. * * As a rule, every redis command is received in a function named * on_command. The user has to override the base class version to diff --git a/examples/async_events.cpp b/examples/async_events.cpp deleted file mode 100644 index 62e6288f..00000000 --- a/examples/async_events.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include -#include - -namespace net = aedis::net; -namespace this_coro = net::this_coro; -using namespace aedis; -using tcp = net::ip::tcp; -using tcp_socket = net::use_awaitable_t<>::as_default_on_t; - -enum class myevents -{ ignore -, interesting1 -, interesting2 -}; - -net::awaitable example() -{ - try { - request req; - req.rpush("list", {1, 2, 3}); - req.lrange("list", 0, -1, myevents::interesting1); - req.sadd("set", std::set{3, 4, 5}); - req.smembers("set", myevents::interesting2); - req.quit(); - - auto ex = co_await this_coro::executor; - tcp::resolver resv(ex); - tcp_socket socket {ex}; - co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379")); - co_await async_write(socket, req); - - std::string buffer; - for (;;) { - switch (req.events.front().second) { - case myevents::interesting1: - { - resp::response_basic_array res; - co_await resp::async_read(socket, buffer, res); - print(res.result, "Interesting1"); - } break; - case myevents::interesting2: - { - resp::response_basic_set res; - co_await resp::async_read(socket, buffer, res); - print(res.result, "Interesting2"); - } break; - default: - { - resp::response_ignore res; - co_await resp::async_read(socket, buffer, res); - } - } - req.events.pop(); - } - } catch (std::exception const& e) { - std::cerr << e.what() << std::endl; - } -} - -int main() -{ - net::io_context ioc {1}; - co_spawn(ioc, example(), net::detached); - ioc.run(); -} - diff --git a/examples/async_low_level.cpp b/examples/async_low_level.cpp new file mode 100644 index 00000000..83536ddd --- /dev/null +++ b/examples/async_low_level.cpp @@ -0,0 +1,69 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include +#include + +using namespace aedis; + +// Low level async example. + +enum class events {one, two, ignore}; + +net::awaitable example() +{ + try { + auto ex = co_await net::this_coro::executor; + + request req; + req.rpush("list", {1, 2, 3}); + req.lrange("list", 0, -1, events::one); + req.sadd("set", std::set{3, 4, 5}); + req.smembers("set", events::two); + req.quit(); + + net::ip::tcp::resolver resv(ex); + auto const results = resv.resolve("127.0.0.1", "6379"); + net::ip::tcp::socket socket {ex}; + co_await net::async_connect(socket, results, net::use_awaitable); + co_await async_write(socket, req, net::use_awaitable); + + std::string buffer; + for (;;) { + switch (req.events.front().second) { + case events::one: + { + resp::response_basic_array res; + co_await async_read(socket, buffer, res, net::use_awaitable); + print(res.result, "one"); + } break; + case events::two: + { + resp::response_basic_array res; + co_await async_read(socket, buffer, res, net::use_awaitable); + print(res.result, "two"); + } break; + default: + { + resp::response_ignore res; + co_await async_read(socket, buffer, res, net::use_awaitable); + } + } + req.events.pop(); + } + } catch (std::exception const& e) { + std::cerr << e.what() << std::endl; + } +} + +int main() +{ + net::io_context ioc {1}; + net::co_spawn(ioc, example(), net::detached); + ioc.run(); +} + diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp deleted file mode 100644 index 834aca0f..00000000 --- a/examples/async_pubsub.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include -#include - -namespace net = aedis::net; -using namespace aedis; -using tcp = net::ip::tcp; - -enum class myevents -{ one -, two -, three -, ignore -}; - -void fill(request& req) -{ - req.ping(myevents::one); - req.rpush("list", {1, 2, 3}); - req.lrange("list"); - req.ping(myevents::two); -} - -class myreceiver : public receiver_base { -private: - std::shared_ptr> conn_; - -public: - using event_type = myevents; - - myreceiver(std::shared_ptr> conn) - : conn_{conn} - { } - - void on_hello(myevents ev, resp::array_type& v) noexcept override - { - print(v); - conn_->send(fill); - } -}; - -int main() -{ - net::io_context ioc {1}; - - tcp::resolver resolver{ioc}; - auto const results = resolver.resolve("127.0.0.1", "6379"); - - auto conn = std::make_shared>(ioc); - myreceiver recv{conn}; - - conn->start(recv, results); - ioc.run(); -} - - diff --git a/examples/async_reconnect.cpp b/examples/async_reconnect.cpp deleted file mode 100644 index d46ce483..00000000 --- a/examples/async_reconnect.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include -#include - -namespace net = aedis::net; -namespace this_coro = net::this_coro; -using namespace aedis; -using tcp = net::ip::tcp; -using tcp_socket = net::use_awaitable_t<>::as_default_on_t; -using stimer = net::use_awaitable_t<>::as_default_on_t; - -enum class events {ignore}; - -net::awaitable example1() -{ - auto ex = co_await this_coro::executor; - for (;;) { - try { - request req; - req.quit(); - - tcp::resolver resv(ex); - auto const r = resv.resolve("127.0.0.1", "6379"); - tcp_socket socket {ex}; - co_await async_connect(socket, r); - co_await async_write(socket, req); - - std::string buffer; - for (;;) { - resp::response_ignore res; - co_await resp::async_read(socket, buffer, res); - } - } catch (std::exception const& e) { - std::cerr << "Trying to reconnect ..." << std::endl; - stimer timer(ex, std::chrono::seconds{2}); - co_await timer.async_wait(); - } - } -} - -int main() -{ - net::io_context ioc {1}; - co_spawn(ioc, example1(), net::detached); - ioc.run(); -} - diff --git a/examples/sync_basic.cpp b/examples/sync_basic.cpp index c492259d..d2c827ab 100644 --- a/examples/sync_basic.cpp +++ b/examples/sync_basic.cpp @@ -10,15 +10,15 @@ using namespace aedis; -enum class events {ignore}; +enum events {one, two, ignore}; int main() { try { request req; req.hello(); - req.set("Password", {"12345"}); - req.get("Password"); + req.rpush("list", {1, 2, 3}); + req.lrange("list"); req.quit(); net::io_context ioc {1}; @@ -28,21 +28,26 @@ int main() write(socket, req); std::string buffer; - resp::response_map hello; - resp::read(socket, buffer, hello); - print(hello.result); - resp::response_simple_string set; - resp::read(socket, buffer, set); + resp::response_ignore hello; + read(socket, buffer, hello); - resp::response_blob_string get; - resp::read(socket, buffer, get); - std::cout << "get: " << get.result << std::endl; + resp::response_basic_number list_size; + read(socket, buffer, list_size); + std::cout << list_size.result << std::endl; + + resp::response_basic_array list; + read(socket, buffer, list); + print(list.result); + + resp::response_simple_string ok; + read(socket, buffer, ok); + std::cout << ok.result << std::endl; + + resp::response_ignore noop; + read(socket, buffer, noop); - resp::response_ignore quit; - resp::read(socket, buffer, quit); } catch (std::exception const& e) { std::cerr << e.what() << std::endl; } } - diff --git a/examples/sync_responses.cpp b/examples/sync_responses.cpp deleted file mode 100644 index 73eaef49..00000000 --- a/examples/sync_responses.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - */ - -#include - -namespace net = aedis::net; - -using namespace net; -using namespace aedis; - -int main() -{ - try { - resp::request req; - req.hello(); - req.rpush("list", {1, 2, 3}); - req.lrange("list"); - req.quit(); - - io_context ioc {1}; - tcp::resolver resv(ioc); - tcp::socket socket {ioc}; - net::connect(socket, resv.resolve("127.0.0.1", "6379")); - resp::write(socket, req); - - std::string buffer; - - resp::response_ignore hello; - resp::read(socket, buffer, hello); - - resp::response_number list_size; - resp::read(socket, buffer, list_size); - std::cout << list_size.result << std::endl; - - resp::response_list list; - resp::read(socket, buffer, list); - print(list.result); - - resp::response_simple_string ok; - resp::read(socket, buffer, ok); - std::cout << ok.result << std::endl; - - resp::response_ignore noop; - resp::read(socket, buffer, noop); - - } catch (std::exception const& e) { - std::cerr << e.what() << std::endl; - } -} - diff --git a/include/aedis/command.hpp b/include/aedis/command.hpp index 0f051644..a6e7de73 100644 --- a/include/aedis/command.hpp +++ b/include/aedis/command.hpp @@ -17,7 +17,7 @@ enum class command , bgrewriteaof , bgsave , bitcount -, client +, client_id , del , exec , expire @@ -71,7 +71,7 @@ auto const* to_string(command c) EXPAND_COMMAND_CASE(bgrewriteaof); EXPAND_COMMAND_CASE(bgsave); EXPAND_COMMAND_CASE(bitcount); - EXPAND_COMMAND_CASE(client); + EXPAND_COMMAND_CASE(client_id); EXPAND_COMMAND_CASE(del); EXPAND_COMMAND_CASE(exec); EXPAND_COMMAND_CASE(expire); diff --git a/include/aedis/receiver_base.hpp b/include/aedis/receiver_base.hpp index a43e7c3f..f9243f35 100644 --- a/include/aedis/receiver_base.hpp +++ b/include/aedis/receiver_base.hpp @@ -24,24 +24,37 @@ public: // Array virtual void on_lrange(Event ev, resp::array_type& v) noexcept { } - virtual void on_hello(Event ev, resp::array_type& v) noexcept {} + virtual void on_lpop(Event ev, resp::array_type& v) noexcept { } + + // Map + virtual void on_hello(Event ev, resp::map_type& v) noexcept {} // Simple string virtual void on_ping(Event ev, resp::simple_string_type& v) noexcept { } virtual void on_quit(Event ev, resp::simple_string_type& v) noexcept { } + virtual void on_flushall(Event ev, resp::simple_string_type& v) noexcept { } + virtual void on_ltrim(Event ev, resp::simple_string_type& v) noexcept { } + virtual void on_set(Event ev, resp::simple_string_type& v) noexcept { } // Number - virtual void on_rpush(Event ev, resp::response_number::data_type& v) noexcept { } + virtual void on_rpush(Event ev, resp::number_type v) noexcept { } + virtual void on_del(Event ev, resp::number_type v) noexcept { } + virtual void on_llen(Event ev, resp::number_type v) noexcept { } + virtual void on_publish(Event ev, resp::number_type v) noexcept { } + virtual void on_incr(Event ev, resp::number_type v) noexcept { } + + // Blob string + virtual void on_lpop(Event ev, resp::blob_string_type& v) noexcept { } + virtual void on_get(Event ev, resp::blob_string_type& v) noexcept { } - virtual void on_set(command cmd, Event ev, resp::array_type& v) noexcept { } virtual void on_double(command cmd, Event ev, resp::double_type& v) noexcept { } virtual void on_big_number(command cmd, Event ev, resp::big_number_type& v) noexcept { } virtual void on_boolean(command cmd, Event ev, resp::bool_type& v) noexcept { } - virtual void on_blob_string(command cmd, Event ev, resp::blob_string_type& v) noexcept { } virtual void on_verbatim_string(command cmd, Event ev, resp::verbatim_string_type& v) noexcept { } virtual void on_streamed_string_part(command cmd, Event ev, resp::streamed_string_part_type& v) noexcept { } virtual void on_error(boost::system::error_code ec) { } + // TODO: Introduce a push type. virtual void on_push(Event ev, resp::array_type& v) noexcept { } virtual void on_simple_error(command cmd, Event ev, resp::response_simple_error::data_type& v) noexcept { } virtual void on_blob_error(command cmd, Event ev, resp::response_blob_error::data_type& v) noexcept { } diff --git a/include/aedis/request.hpp b/include/aedis/request.hpp index a04417b6..98e24a6f 100644 --- a/include/aedis/request.hpp +++ b/include/aedis/request.hpp @@ -215,9 +215,21 @@ public: void lpop( std::string_view key, + int count = 1, Event e = Event::ignore) { - resp::assemble(payload, "LPOP", key); + if (count == 1) { + resp::assemble(payload, "LPOP", key); + } else { + auto par = {std::to_string(count)}; + resp::assemble( + payload, + "LPOP", + {key}, + std::cbegin(par), + std::cend(par)); + } + events.push({command::lpop, e}); } @@ -565,6 +577,7 @@ public: events.push({command::ltrim, e}); } + // TODO: Overload for vector del. auto del( std::string_view key, @@ -636,12 +649,12 @@ public: } auto - client( + client_id( std::string_view parameters, Event e = Event::ignore) { - resp::assemble(payload, "CLIENT", {parameters}); - events.push({command::client, e}); + resp::assemble(payload, "CLIENT ID", {parameters}); + events.push({command::client_id, e}); } }; diff --git a/include/aedis/response_buffers.hpp b/include/aedis/response_buffers.hpp index 3d6939c4..94b26705 100644 --- a/include/aedis/response_buffers.hpp +++ b/include/aedis/response_buffers.hpp @@ -26,8 +26,8 @@ private: response_tree tree_; response_array array_; response_array push_; - response_array set_; - response_array map_; + response_set set_; + response_map map_; response_array attribute_; response_simple_string simple_string_; response_simple_error simple_error_; @@ -101,13 +101,13 @@ public: switch (id.t) { case type::push: { - assert(id.t == type::invalid); + assert(id.t == type::push); recv.on_push(id.event, push_.result); push_.result.clear(); } break; case type::set: { - recv.on_set(id.cmd, id.event, set_.result); + //recv.on_set(id.cmd, id.event, set_.result); set_.result.clear(); } break; case type::map: @@ -122,6 +122,7 @@ public: { switch (id.cmd) { case command::lrange: recv.on_lrange(id.event, array_.result); break; + case command::lpop: recv.on_lpop(id.event, array_.result); break; default: {assert(false);} } array_.result.clear(); @@ -131,6 +132,9 @@ public: switch (id.cmd) { case command::ping: recv.on_ping(id.event, simple_string_.result); break; case command::quit: recv.on_quit(id.event, simple_string_.result); break; + case command::flushall: recv.on_flushall(id.event, simple_string_.result); break; + case command::ltrim: recv.on_ltrim(id.event, simple_string_.result); break; + case command::set: recv.on_set(id.event, simple_string_.result); break; default: {assert(false);} } simple_string_.result.clear(); @@ -139,6 +143,10 @@ public: { switch (id.cmd) { case command::rpush: recv.on_rpush(id.event, number_.result); break; + case command::del: recv.on_del(id.event, number_.result); break; + case command::llen: recv.on_llen(id.event, number_.result); break; + case command::publish: recv.on_publish(id.event, number_.result); break; + case command::incr: recv.on_incr(id.event, number_.result); break; default: {assert(false);} } } break; @@ -158,7 +166,11 @@ public: } break; case type::blob_string: { - recv.on_blob_string(id.cmd, id.event, blob_string_.result); + switch (id.cmd) { + case command::lpop: recv.on_lpop(id.event, blob_string_.result); break; + case command::get: recv.on_get(id.event, blob_string_.result); break; + default: {assert(false);} + } blob_string_.result.clear(); } break; case type::verbatim_string: diff --git a/include/aedis/response_types.hpp b/include/aedis/response_types.hpp index 2cd79d62..8dffae67 100644 --- a/include/aedis/response_types.hpp +++ b/include/aedis/response_types.hpp @@ -37,6 +37,7 @@ using double_type = response_double::data_type; using verbatim_string_type = response_verbatim_string::data_type; using streamed_string_part_type = response_streamed_string_part::data_type; using bool_type = response_bool::data_type; +using number_type = response_number::data_type; } // resp } // aedis diff --git a/tests/general.cpp b/tests/general.cpp index c19a6bda..d4721c92 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -21,7 +21,7 @@ namespace this_coro = net::this_coro; using namespace aedis; -enum class events {ignore}; +enum class events {one, two, three, ignore}; template void check_equal(T const& a, T const& b, std::string const& msg = "") @@ -32,7 +32,95 @@ void check_equal(T const& a, T const& b, std::string const& msg = "") std::cout << "Error: " << msg << std::endl; } -net::awaitable test_list() +class test_receiver : public receiver_base { +private: + std::shared_ptr> conn_; + + std::vector list_ {1 ,2, 3, 4, 5, 6}; + std::string set_ {"aaa"}; + +public: + using event_type = events; + test_receiver(std::shared_ptr> conn) : conn_{conn} { } + + void on_hello(events ev, resp::array_type& v) noexcept override + { + auto f = [this](auto& req) + { + req.flushall(); + req.ping(); + req.rpush("a", list_); + req.llen("a"); + req.lrange("a"); + req.ltrim("a", 2, -2); + req.lpop("a"); + req.lpop("a", 2); // Not working? + req.set("b", {set_}); + req.get("b"); + req.del("b"); + req.subscribe("channel"); + req.publish("channel", "message"); + req.incr("c"); + req.quit(); + }; + + conn_->disable_reconnect(); + conn_->send(f); + } + + virtual void on_push(events ev, resp::array_type& v) noexcept override + { + // TODO: Check the responses below. + // {"subscribe", "channel", "1"} + // {"message", "channel", "message"} + check_equal(1, 1, "push (receiver)"); + } + + void on_get(events ev, resp::blob_string_type& s) noexcept override + { check_equal(s, set_, "get (receiver)"); } + + void on_set(events ev, resp::simple_string_type& s) noexcept override + { check_equal(s, {"OK"}, "set (receiver)"); } + + void on_lpop(events ev, resp::blob_string_type& s) noexcept override + { check_equal(s, {"3"}, "lpop (receiver)"); } + + void on_lpop(events ev, resp::array_type& s) noexcept override + { check_equal(s, {"4", "5"}, "lpop(count) (receiver)"); } + + void on_ping(events ev, resp::simple_string_type& s) noexcept override + { check_equal(s, {"PONG"}, "ping (receiver)"); } + + void on_quit(events ev, resp::simple_string_type& s) noexcept override + { check_equal(s, {"OK"}, "quit (receiver)"); } + + void on_flushall(events ev, resp::simple_string_type& s) noexcept override + { check_equal(s, {"OK"}, "flushall (receiver)"); } + + void on_ltrim(events ev, resp::simple_string_type& s) noexcept override + { check_equal(s, {"OK"}, "ltrim (receiver)"); } + + void on_rpush(events ev, resp::number_type n) noexcept override + { check_equal(n, (resp::number_type)std::size(list_), "rpush (receiver)"); } + + void on_del(events ev, resp::number_type n) noexcept override + { check_equal((int)n, 1, "del (receiver)"); } + + void on_llen(events ev, resp::number_type n) noexcept override + { check_equal((int)n, 6, "llen (receiver)"); } + + void on_incr(events ev, resp::number_type n) noexcept override + { check_equal((int)n, 1, "incr (receiver)"); } + + void on_publish(events ev, resp::number_type n) noexcept override + { check_equal((int)n, 1, "publish (receiver)"); } + + void on_lrange(events ev, resp::array_type& v) noexcept override + { check_equal(v, {"1", "2", "3", "4", "5", "6"}, "lrange (receiver)"); } +}; + +net::awaitable +test_list(net::ip::tcp::resolver::results_type const& results) { std::vector list {1 ,2, 3, 4, 5, 6}; @@ -47,10 +135,8 @@ net::awaitable test_list() p.quit(); auto ex = co_await this_coro::executor; - tcp::resolver resv(ex); - auto const rr = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; - co_await async_connect(socket, rr); + co_await async_connect(socket, results); co_await async_write(socket, net::buffer(p.payload)); std::string buffer; @@ -102,8 +188,11 @@ net::awaitable test_list() } } -net::awaitable test_set() +net::awaitable +test_set(net::ip::tcp::resolver::results_type const& results) { + auto ex = co_await this_coro::executor; + // Tests whether the parser can handle payloads that contain the separator. std::string test_bulk1(10000, 'a'); test_bulk1[30] = '\r'; @@ -111,11 +200,8 @@ net::awaitable test_set() std::string test_bulk2 = "aaaaa"; - auto ex = co_await this_coro::executor; - tcp::resolver resv(ex); - auto const rr = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; - co_await async_connect(socket, rr); + co_await async_connect(socket, results); request p; p.hello("3"); @@ -528,8 +614,17 @@ int main(int argc, char* argv[]) co_spawn(ioc, verbatim_string(), net::detached); co_spawn(ioc, set(), net::detached); co_spawn(ioc, map(), net::detached); - co_spawn(ioc, test_list(), net::detached); - co_spawn(ioc, test_set(), net::detached); + + tcp::resolver resv(ioc); + auto const results = resv.resolve("127.0.0.1", "6379"); + + co_spawn(ioc, test_list(results), net::detached); + co_spawn(ioc, test_set(results), net::detached); + + auto conn = std::make_shared>(ioc); + test_receiver recv{conn}; + conn->start(recv, results); + ioc.run(); }