diff --git a/README.md b/README.md index 621e6414..427e8161 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,65 @@ # Aedis Aedis is a low level redis client designed for scalability and to -provide an easy and intuitive interface. All protocol features are -supported (to the best of my knowledge), some of them are +provide an easy and intuitive interface. Some of the supported +features are +* RESP3: The new redis protocol. +* STL containers. * Command pipelines (essential for performance). -* TLS: Automatically supported since aedis uses ASIO's `AsyncReadStream`. -* ASIO asynchronous model where futures, callbacks and coroutines are - supported. +* TLS. +* Coroutines, futures and callbacks. + +At the moment the biggest missing parts are + +* Attribute data type: Its specification is incomplete in my opinion + and I found no meaningful way to test them as Redis itself doesn't + seem to be usign them. +* Push type: I still did not manage to generate the notifications so I + can test my implementation. ## Tutorial -Let us begin with a synchronous example +A simple example is enough to show many of the aedis features ```cpp int main() { try { - resp::pipeline p; - p.set("Password", {"12345"}); - p.quit(); + resp::request req; + req.hello(); + req.set("Password", {"12345"}); + req.get("Password"); + req.quit(); - io_context ioc {1}; + net::io_context ioc {1}; tcp::resolver resv(ioc); tcp::socket socket {ioc}; net::connect(socket, resv.resolve("127.0.0.1", "6379")); - net::write(socket, buffer(p.payload)); + net::write(socket, net::buffer(req.payload)); std::string buffer; for (;;) { - resp::response_simple_string res; - resp::read(socket, buffer, res); - std::cout << res.result << std::endl; + switch (req.events.front().first) { + case resp::command::hello: + { + resp::response_flat_map res; + resp::read(socket, buffer, res); + print(res.result); + } break; + case resp::command::get: + { + resp::response_blob_string res; + resp::read(socket, buffer, res); + std::cout << "get: " << res.result << std::endl; + } break; + default: + { + resp::response_ignore res; + resp::read(socket, buffer, res); + } + } + req.events.pop(); } } catch (std::exception const& e) { std::cerr << e.what() << std::endl; @@ -41,76 +69,69 @@ int main() The important things to notice above are +* The `hello` command is included in the request as required by RESP3. +* Many commands are sent in the same request, the so called pipeline. * We keep reading from the socket until it is closed by the redis - server (as requested by the quit command). + server as requested in the quit command. +* The response is parsed in an appropriate buffer. The `hello` command in a map and + the get into a string. -* The commands are composed with the `pipeline` class. - -* The response is parsed in an appropriate buffer `response_simple_string`. - -Converting the example above to use coroutines is trivial - -```cpp -net::awaitable example1() -{ - resp::pipeline p; - p.set("Password", {"12345"}); - p.quit(); - - auto ex = co_await this_coro::executor; - tcp::resolver resv(ex); - auto const r = resv.resolve("127.0.0.1", "6379"); - tcp_socket socket {ex}; - co_await async_connect(socket, r); - co_await async_write(socket, buffer(p.payload)); - - std::string buffer; - for (;;) { - resp::response_simple_string res; - co_await resp::async_read(socket, buffer, res); - std::cout << res.result << std::endl; - } -} -``` - -From now on we will use coroutines in the tutorial as this is how most -people should communicating to the redis server usually. +It is trivial to rewrite the example above to use coroutines, see +`examples/async_basic.cpp`. From now on we will use coroutines in the +tutorial as this is how most people should communicating to the redis +server usually. ### Response buffer To communicate efficiently with redis it is necessary to understand the possible response types. RESP3 spcifies the following data types -1. simple string -1. simple error -1. number -1. double -1. bool -1. big number -1. null -1. blob error -1. verbatim string -1. blob string -1. streamed string part +1. Simple string +1. Simple error +1. Number +1. Double +1. Bool +1. Big number +1. Null +1. Blob error +1. Verbatim string +1. Blob string +1. Streamed string part These data types can come in different aggregate types -1. array -1. push -1. set -1. map -1. attribute +1. Array +1. Push +1. Set +1. Map +1. Attribute -Aedis provides appropriate response types for each of the data and -aggregate types. For example +Aedis provides appropriate response types for each of them. + +### Events + +The request type used above keeps a `std::queue` of commands in the +order they are expected to arrive. In addition to that you can +specify your own events + +```cpp +enum class myevents +{ ignore +, list +, set +}; +``` +and pass it as argument to the request as follows ```cpp net::awaitable example() { try { - resp::pipeline p; + resp::request p; p.rpush("list", {1, 2, 3}); - p.lrange("list"); + p.lrange("list", 0, -1, myevents::interesting1); + p.sadd("set", std::set{3, 4, 5}); + p.smembers("set", myevents::interesting2); p.quit(); auto ex = co_await this_coro::executor; @@ -119,80 +140,26 @@ net::awaitable example() co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379")); co_await net::async_write(socket, net::buffer(p.payload)); - std::string buffer; - resp::response_number list_size; - co_await resp::async_read(socket, buffer, list_size); - - resp::response_list list; - co_await resp::async_read(socket, buffer, list); - - resp::response_simple_string ok; - co_await resp::async_read(socket, buffer, ok); - - resp::response noop; - co_await resp::async_read(socket, buffer, noop); - - } catch (std::exception const& e) { - std::cerr << e.what() << std::endl; - } -} -``` - -Usually the commands that are sent to redis are determined dynamically -so it is not possible to structure the code like above, to deal with -that it we support events. - -#### Events - -To use events, define an enum class like the one below - -```cpp -enum class myevents -{ ignore -, list -, set -}; -``` - -and pass it as argument to the pipeline commands as below - -```cpp -net::awaitable example() -{ - try { - resp::pipeline p; - p.rpush("list", {1, 2, 3}); - p.lrange("list", 0, -1, myevents::list); - p.sadd("set", std::set{3, 4, 5}); - p.smembers("set", myevents::set); - p.quit(); - - auto ex = co_await this_coro::executor; - tcp::resolver resv(ex); - tcp_socket socket {ex}; - co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379")); - co_await net::async_write(socket, buffer(p.payload)); - std::string buffer; for (;;) { - switch (p.events.front()) { - case myevents::list: - { - resp::response_list res; - co_await resp::async_read(socket, buffer, res); - print(res.result); - } break; - case myevents::set: - { - resp::response_set res; - co_await resp::async_read(socket, buffer, res); - print(res.result); - } break; - default: - { - resp::response res; - co_await resp::async_read(socket, buffer, res); - } + switch (p.events.front().second) { + case myevents::interesting1: + { + resp::response_list res; + co_await resp::async_read(socket, buffer, res); + print(res.result); + } break; + case myevents::interesting2: + { + resp::response_set res; + co_await resp::async_read(socket, buffer, res); + print(res.result); + } break; + default: + { + resp::response_ignore res; + co_await resp::async_read(socket, buffer, res); + } } p.events.pop(); } @@ -204,15 +171,15 @@ net::awaitable example() ## Reconnecting and Sentinel support -In production we need a reconnect mechanism, some of the reasons are +In production we usually need a way to reconnect to the redis server +after a disconnect, some of the reasons are 1. The server has crashed and has been restarted by systemd. 1. All connection have been killed by the admin. 1. A failover operation has started. -### Simple reconnet - -It is trivial to implement a reconnect using a coroutine +It is easy to implement such a mechnism in scalable way using +coroutines, for example ```cpp net::awaitable example1() @@ -220,34 +187,29 @@ net::awaitable example1() auto ex = co_await this_coro::executor; for (;;) { try { - resp::pipeline p; - p.set("Password", {"12345"}); + resp::request p; p.quit(); tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, buffer(p.payload)); + co_await async_write(socket, net::buffer(p.payload)); std::string buffer; for (;;) { - resp::response_string res; + resp::response_ignore res; co_await resp::async_read(socket, buffer, res); - std::cout << res.result << std::endl; } } catch (std::exception const& e) { - std::cerr << "Error: " << e.what() << std::endl; - stimer timer(ex); - timer.expires_after(std::chrono::seconds{2}); + std::cerr << "Trying to reconnect ..." << std::endl; + stimer timer(ex, std::chrono::seconds{2}); co_await timer.async_wait(); } } } ``` -For many usecases this is often enough. A more sophisticated reconnect -strategy however is to use a redis-sentinel. - -### Sentinel +More sophisticated reconnect strategies using sentinel are also easy +to implement using coroutines. diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 31295528..9b823a80 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -9,39 +9,58 @@ #include namespace net = aedis::net; -using tcp = net::ip::tcp; -using tcp_socket = net::use_awaitable_t<>::as_default_on_t; - namespace this_coro = net::this_coro; - -using namespace net; using namespace aedis; +using tcp_socket = net::use_awaitable_t<>::as_default_on_t; net::awaitable example1() { - resp::request p; - p.set("Password", {"12345"}); - p.quit(); + try { + resp::request req; + req.hello(); + req.set("Password", {"12345"}); + req.get("Password"); + req.quit(); - auto ex = co_await this_coro::executor; - tcp::resolver resv(ex); - auto const r = resv.resolve("127.0.0.1", "6379"); - tcp_socket socket {ex}; - co_await async_connect(socket, r); - co_await async_write(socket, buffer(p.payload)); + auto ex = co_await this_coro::executor; + tcp::resolver resv(ex); + auto const r = resv.resolve("127.0.0.1", "6379"); + tcp_socket socket {ex}; + co_await async_connect(socket, r); + co_await async_write(socket, net::buffer(req.payload)); - std::string buffer; - for (;;) { - resp::response_simple_string res; - co_await resp::async_read(socket, buffer, res); - std::cout << res.result << std::endl; + std::string buffer; + for (;;) { + switch (req.events.front().first) { + case resp::command::hello: + { + resp::response_flat_map res; + co_await resp::async_read(socket, buffer, res); + print(res.result); + } break; + case resp::command::get: + { + resp::response_blob_string res; + co_await resp::async_read(socket, buffer, res); + std::cout << "get: " << res.result << std::endl; + } break; + default: + { + resp::response_ignore res; + co_await resp::async_read(socket, buffer, res); + } + } + req.events.pop(); + } + } catch (std::exception const& e) { + std::cerr << e.what() << std::endl; } } int main() { - io_context ioc {1}; - co_spawn(ioc, example1(), detached); + net::io_context ioc {1}; + net::co_spawn(ioc, example1(), net::detached); ioc.run(); } diff --git a/examples/async_events.cpp b/examples/async_events.cpp index e84bec27..cb9659fd 100644 --- a/examples/async_events.cpp +++ b/examples/async_events.cpp @@ -8,18 +8,15 @@ #include namespace net = aedis::net; +namespace this_coro = net::this_coro; +using namespace aedis; using tcp = net::ip::tcp; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; -namespace this_coro = net::this_coro; - -using namespace net; -using namespace aedis; - enum class myevents { ignore -, list -, set +, interesting1 +, interesting2 }; net::awaitable example() @@ -27,37 +24,37 @@ net::awaitable example() try { resp::request p; p.rpush("list", {1, 2, 3}); - p.lrange("list", 0, -1, myevents::list); + p.lrange("list", 0, -1, myevents::interesting1); p.sadd("set", std::set{3, 4, 5}); - p.smembers("set", myevents::set); + p.smembers("set", myevents::interesting2); p.quit(); auto ex = co_await this_coro::executor; tcp::resolver resv(ex); tcp_socket socket {ex}; co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379")); - co_await net::async_write(socket, buffer(p.payload)); + co_await net::async_write(socket, net::buffer(p.payload)); std::string buffer; for (;;) { switch (p.events.front().second) { - case myevents::list: - { - resp::response_list res; - co_await resp::async_read(socket, buffer, res); - print(res.result); - } break; - case myevents::set: - { - resp::response_set res; - co_await resp::async_read(socket, buffer, res); - print(res.result); - } break; - default: - { - resp::response_ignore res; - co_await resp::async_read(socket, buffer, res); - } + case myevents::interesting1: + { + resp::response_list res; + co_await resp::async_read(socket, buffer, res); + print(res.result); + } break; + case myevents::interesting2: + { + resp::response_set res; + co_await resp::async_read(socket, buffer, res); + print(res.result); + } break; + default: + { + resp::response_ignore res; + co_await resp::async_read(socket, buffer, res); + } } p.events.pop(); } @@ -68,8 +65,8 @@ net::awaitable example() int main() { - io_context ioc {1}; - co_spawn(ioc, example(), detached); + net::io_context ioc {1}; + co_spawn(ioc, example(), net::detached); ioc.run(); } diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp index 85cc53a9..1f1db91a 100644 --- a/examples/async_pubsub.cpp +++ b/examples/async_pubsub.cpp @@ -30,10 +30,12 @@ net::awaitable publisher() std::string buffer; for (;;) { resp::request p; + p.hello(); p.publish("channel", "12345"); co_await async_write(socket, net::buffer(p.payload)); resp::response_ignore res; co_await resp::async_read(socket, buffer, res); + co_await resp::async_read(socket, buffer, res); stimer timer(ex); timer.expires_after(std::chrono::seconds{2}); co_await timer.async_wait(); diff --git a/examples/async_reconnect.cpp b/examples/async_reconnect.cpp index 954efd2b..613ea8c2 100644 --- a/examples/async_reconnect.cpp +++ b/examples/async_reconnect.cpp @@ -9,38 +9,33 @@ #include namespace net = aedis::net; +namespace this_coro = net::this_coro; +using namespace aedis; using tcp = net::ip::tcp; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using stimer = net::use_awaitable_t<>::as_default_on_t; -namespace this_coro = net::this_coro; - -using namespace net; -using namespace aedis; - net::awaitable example1() { auto ex = co_await this_coro::executor; for (;;) { try { resp::request p; - p.set("Password", {"12345"}); p.quit(); tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, buffer(p.payload)); + co_await async_write(socket, net::buffer(p.payload)); std::string buffer; for (;;) { - resp::response_simple_string res; + resp::response_ignore res; co_await resp::async_read(socket, buffer, res); - std::cout << res.result << std::endl; } } catch (std::exception const& e) { - std::cerr << "Error: " << e.what() << std::endl; + std::cerr << "Trying to reconnect ..." << std::endl; stimer timer(ex, std::chrono::seconds{2}); co_await timer.async_wait(); } @@ -49,8 +44,8 @@ net::awaitable example1() int main() { - io_context ioc {1}; - co_spawn(ioc, example1(), detached); + net::io_context ioc {1}; + co_spawn(ioc, example1(), net::detached); ioc.run(); } diff --git a/examples/async_responses.cpp b/examples/async_responses.cpp index 40c431d5..ad406ff7 100644 --- a/examples/async_responses.cpp +++ b/examples/async_responses.cpp @@ -22,6 +22,7 @@ net::awaitable example() { try { resp::request p; + p.hello(); p.rpush("list", {1, 2, 3}); p.lrange("list"); p.quit(); @@ -33,6 +34,9 @@ net::awaitable example() co_await net::async_write(socket, net::buffer(p.payload)); std::string buffer; + resp::response_ignore hello; + co_await resp::async_read(socket, buffer, hello); + resp::response_number list_size; co_await resp::async_read(socket, buffer, list_size); std::cout << list_size.result << std::endl; diff --git a/examples/sync_basic.cpp b/examples/sync_basic.cpp index c9741ae6..de0e9927 100644 --- a/examples/sync_basic.cpp +++ b/examples/sync_basic.cpp @@ -8,28 +8,45 @@ #include namespace net = aedis::net; - -using namespace net; using namespace aedis; int main() { try { - resp::request p; - p.set("Password", {"12345"}); - p.quit(); + resp::request req; + req.hello(); + req.set("Password", {"12345"}); + req.get("Password"); + req.quit(); - io_context ioc {1}; + net::io_context ioc {1}; tcp::resolver resv(ioc); tcp::socket socket {ioc}; net::connect(socket, resv.resolve("127.0.0.1", "6379")); - net::write(socket, buffer(p.payload)); + net::write(socket, net::buffer(req.payload)); std::string buffer; for (;;) { - resp::response_simple_string res; - resp::read(socket, buffer, res); - std::cout << res.result << std::endl; + switch (req.events.front().first) { + case resp::command::hello: + { + resp::response_flat_map res; + resp::read(socket, buffer, res); + print(res.result); + } break; + case resp::command::get: + { + resp::response_blob_string res; + resp::read(socket, buffer, res); + std::cout << "get: " << res.result << std::endl; + } break; + default: + { + resp::response_ignore res; + resp::read(socket, buffer, res); + } + } + req.events.pop(); } } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/sync_events.cpp b/examples/sync_events.cpp index 3bf83c1e..d32ad5d0 100644 --- a/examples/sync_events.cpp +++ b/examples/sync_events.cpp @@ -14,18 +14,19 @@ using namespace aedis; enum class myevents { ignore -, list -, set +, interesting1 +, interesting2 }; int main() { try { resp::request p; + p.hello(); p.rpush("list", {1, 2, 3}); - p.lrange("list", 0, -1, myevents::list); + p.lrange("list", 0, -1, myevents::interesting1); p.sadd("set", std::set{3, 4, 5}); - p.smembers("set", myevents::set); + p.smembers("set", myevents::interesting2); p.quit(); io_context ioc {1}; @@ -37,23 +38,23 @@ int main() std::string buffer; for (;;) { switch (p.events.front().second) { - case myevents::list: - { - resp::response_list res; - resp::read(socket, buffer, res); - print(res.result); - } break; - case myevents::set: - { - resp::response_set res; - resp::read(socket, buffer, res); - print(res.result); - } break; - default: - { - resp::response_ignore res; - resp::read(socket, buffer, res); - } + case myevents::interesting1: + { + resp::response_list res; + resp::read(socket, buffer, res); + print(res.result); + } break; + case myevents::interesting2: + { + resp::response_set res; + resp::read(socket, buffer, res); + print(res.result); + } break; + default: + { + resp::response_ignore res; + resp::read(socket, buffer, res); + } } p.events.pop(); } diff --git a/examples/sync_responses.cpp b/examples/sync_responses.cpp index 986dd8ea..d88842d7 100644 --- a/examples/sync_responses.cpp +++ b/examples/sync_responses.cpp @@ -16,6 +16,7 @@ int main() { try { resp::request p; + p.hello(); p.rpush("list", {1, 2, 3}); p.lrange("list"); p.quit(); @@ -27,6 +28,10 @@ int main() net::write(socket, buffer(p.payload)); std::string buffer; + + resp::response_ignore hello; + resp::read(socket, buffer, hello); + resp::response_number list_size; resp::read(socket, buffer, list_size); std::cout << list_size.result << std::endl; diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index 243f6661..177742d6 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -7,6 +7,14 @@ #pragma once +#include + +namespace aedis { +namespace net = boost::asio; +namespace ip = net::ip; +using tcp = ip::tcp; +} + #include #include #include diff --git a/include/aedis/read.hpp b/include/aedis/read.hpp index de90c840..17456c5d 100644 --- a/include/aedis/read.hpp +++ b/include/aedis/read.hpp @@ -20,17 +20,7 @@ #include #include -#include - -namespace aedis -{ - -namespace net = boost::asio; -namespace ip = net::ip; -using tcp = ip::tcp; - -namespace resp -{ +namespace aedis { namespace resp { // Converts a decimal number in ascii format to an integer. inline diff --git a/include/aedis/request.hpp b/include/aedis/request.hpp index 5c274c53..f1aca977 100644 --- a/include/aedis/request.hpp +++ b/include/aedis/request.hpp @@ -179,7 +179,9 @@ enum class command enum class event {ignore}; template -struct request { +class request { +private: +public: std::string payload; std::queue> events; @@ -294,10 +296,10 @@ public: void hello( - std::string_view key, + std::string_view version = "3", Event e = Event::ignore) { - resp::assemble(payload, "HELLO", key); + resp::assemble(payload, "HELLO", version); events.push({command::hello, e}); } diff --git a/include/aedis/response.hpp b/include/aedis/response.hpp index 3911fa25..9d364975 100644 --- a/include/aedis/response.hpp +++ b/include/aedis/response.hpp @@ -418,7 +418,6 @@ private: { from_string_view(s, result[i++]); } public: - std::array result; };