diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 37b84de4..1a544b82 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -6,6 +6,7 @@ */ #include +#include namespace net = aedis::net; @@ -40,7 +41,7 @@ net::awaitable example() for (;;) { resp::response_array hello; co_await resp::async_read(socket, buffer, hello); - print(hello.result); + resp::print(hello.result); } } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/async_events.cpp b/examples/async_events.cpp index ace856c2..c4ed7fbb 100644 --- a/examples/async_events.cpp +++ b/examples/async_events.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -6,6 +6,7 @@ */ #include +#include namespace net = aedis::net; namespace this_coro = net::this_coro; @@ -42,13 +43,13 @@ net::awaitable example() { resp::response_list res; co_await resp::async_read(socket, buffer, res); - print(res.result, "Interesting1"); + resp::print(res.result, "Interesting1"); } break; case myevents::interesting2: { resp::response_set res; co_await resp::async_read(socket, buffer, res); - print(res.result, "Interesting2"); + resp::print(res.result, "Interesting2"); } break; default: { diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp index 9632ba0f..7a9b67b3 100644 --- a/examples/async_pubsub.cpp +++ b/examples/async_pubsub.cpp @@ -23,54 +23,142 @@ using tcp = net::ip::tcp; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using stimer = net::use_awaitable_t<>::as_default_on_t; +template +struct resp_id { + resp::command cmd; + resp::type type; + Event ev; +}; + +template +struct responses { + resp::response_simple_string simple_string; + resp::response_blob_string blob_string; + resp::response_array array; + resp::response_transaction resp_trans; + std::queue> trans; +}; + +enum class myevent {zero, one, two, ignore}; + +#define EXPAND_MYEVENT_CASE(x) case myevent::x: return #x + +inline +auto to_string(myevent t) +{ + switch (t) { + EXPAND_MYEVENT_CASE(zero); + EXPAND_MYEVENT_CASE(one); + EXPAND_MYEVENT_CASE(two); + EXPAND_MYEVENT_CASE(ignore); + default: assert(false); + } +} + +using event_type = myevent; + struct receiver { - void receive(resp::command cmd, resp::type t, std::vector v) + void receive(resp_id const& id, std::vector v) { std::cout - << std::left << std::setw(20) << resp::to_string(cmd) - //<< std::left << std::setw(20) << (int)cmd - << std::left << resp::to_string(t) + << std::left << std::setw(15) << resp::to_string(id.cmd) + << std::left << std::setw(20) << resp::to_string(id.type) + << std::left << std::setw(20) << to_string(id.ev) + << v.back() << std::endl; } }; +void fill_request(resp::request& req) +{ + req.hello(); + req.flushall(); + req.subscribe("channel"); + req.subscribe("__keyspace@0__:user:*"); + req.ping(myevent::one); + req.set("aaaa", {std::to_string(1)}); + req.get("aaaa"); + req.del("aaaa"); + req.rpush("user:Marcelo", {1, 2, 3}, myevent::two); + req.lrange("user:Marcelo"); + req.publish("channel", "Some message"); + req.multi(); + req.lrange("user:Marcelo", 0, -1, myevent::zero); + req.exec(); + req.set("aaaa", {std::to_string(2)}); + req.get("aaaa"); + req.multi(); + req.lrange("user:Marcelo"); + req.ping(); + req.lrange("user:Marcelo", 0, -1, myevent::zero); + req.ping(); + req.lrange("user:Marcelo"); + req.ping(); + req.lrange("user:Marcelo"); + req.lrange("user:Marcelo"); + req.exec(); + //req.set("eee", {std::to_string(8)}); + //req.get("eee"); + //req.del("eee"); +} + +// A coroutine that adds commands to the request continously net::awaitable -publisher(tcp_socket& socket, resp::request& req) +filler( + std::queue>& reqs, + net::steady_timer& trigger) { auto ex = co_await this_coro::executor; try { - req.hello(); - req.flushall(); - req.subscribe("channel"); - req.subscribe("__keyspace@0__:user:*"); - req.ping(); - req.set("aaaa", {std::to_string(1)}); - req.get("aaaa"); - req.del("aaaa"); - req.rpush("user:Marcelo", {1, 2, 3}); - req.lrange("user:Marcelo"); - req.publish("channel", "Some message"); - req.multi(); - req.lrange("user:Marcelo"); - req.exec(); - req.set("aaaa", {std::to_string(2)}); - req.get("aaaa"); - req.multi(); - req.lrange("user:Marcelo"); - req.ping(); - req.lrange("user:Marcelo"); - req.ping(); - req.lrange("user:Marcelo"); - req.ping(); - req.lrange("user:Marcelo"); - req.exec(); - //req.quit(); + for (;;) { + std::cout << "filling" << std::endl; + resp::request req; + fill_request(req); + auto const empty = std::empty(reqs); + reqs.push(req); + if (empty) { + std::cout << "filler triggering" << std::endl; + trigger.cancel(); + } - co_await async_write(socket, req); - stimer timer(ex, std::chrono::seconds{2}); - co_await timer.async_wait(); + std::cout << "filler waiting" << std::endl; + stimer timer(ex, std::chrono::milliseconds{20}); + co_await timer.async_wait(); + std::cout << "filler waiting end" << std::endl; + } } catch (std::exception const& e) { - std::cerr << "Error: " << e.what() << std::endl; + std::cerr << "filler: " << e.what() << std::endl; + } +} + +// A coroutine that will write requests to redis. +net::awaitable +publisher( + tcp_socket& socket, + net::steady_timer& trigger, + std::queue>& reqs) +{ + auto ex = co_await this_coro::executor; + for (;;) { + if (!std::empty(reqs)) { + std::cout << "Writing ..." << std::endl; + assert(!std::empty(reqs.front())); + co_await async_write(socket, reqs.front()); + } + + std::cout << "Waiting to write ..." << std::endl; + trigger.expires_after(std::chrono::years{2}); + boost::system::error_code ec; + co_await trigger.async_wait(net::redirect_error(net::use_awaitable, ec)); + if (!socket.is_open()) { + std::cout << "aaa" << std::endl; + co_return; + } + if (ec == net::error::operation_aborted) { + } else { + std::cout << "Publisher leaving." << std::endl; + co_return; + } } } @@ -78,17 +166,20 @@ net::awaitable subscriber() { auto ex = co_await this_coro::executor; try { + net::steady_timer trigger {ex}; tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - resp::request req; - co_spawn(ex, publisher(socket, req), net::detached); + std::queue> reqs; + co_spawn(ex, publisher(socket, trigger, reqs), net::detached); + co_spawn(ex, filler(reqs, trigger), net::detached); std::string buffer; receiver recv; - resp::responses resps; + responses resps; for (;;) { + auto& req = reqs.front(); resp::type type; co_await async_read_type(socket, buffer, type); auto cmd = resp::command::none; @@ -103,40 +194,52 @@ net::awaitable subscriber() auto const* res = cmd == resp::command::multi ? "OK" : "QUEUED"; co_await resp::async_read(socket, buffer, resps.blob_string); assert(resps.blob_string.result == res); - resps.trans.push(req.events.front().first); + resps.trans.push({req.events.front().first, resp::type::invalid, req.events.front().second}); req.events.pop(); continue; } if (cmd == resp::command::exec) { - assert(resps.trans.front() == resp::command::multi); - co_await resp::async_read(socket, buffer, resps.depth1); - //assert(std::size(resps.trans) == std::size(resps.depth1)); + assert(resps.trans.front().cmd == resp::command::multi); + co_await resp::async_read(socket, buffer, resps.resp_trans); resps.trans.pop(); // Removes multi. int i = 0; while (!std::empty(resps.trans)) { - // TODO: type must come from the queue. - // TODO: resps.depth1 is wrong. - recv.receive(resps.trans.front(), type, {}); + resps.trans.front().type = resps.resp_trans.at(i).t; + recv.receive(resps.trans.front(), resps.resp_trans.at(i).value); resps.trans.pop(); ++i; } - resps.depth1.clear(); + resps.resp_trans.clear(); resps.trans = {}; req.events.pop(); // exec + if (std::empty(req.events)) { + std::cout << "reader ready ..." << std::endl; + reqs.pop(); + if (!std::empty(reqs)) { + std::cout << "reader triggering ..." << std::endl; + trigger.cancel(); + } + } continue; } resp::response_array array; co_await resp::async_read(socket, buffer, array); - recv.receive(cmd, type, std::move(array.result)); + recv.receive({cmd, type, req.events.front().second}, std::move(array.result)); array.result.clear(); if (type != resp::type::push) req.events.pop(); - if (std::empty(req.events)) - req.clear(); + if (std::empty(req.events)) { + std::cout << "reader ready ..." << std::endl; + reqs.pop(); + if (!std::empty(reqs)) { + std::cout << "reader triggering ..." << std::endl; + trigger.cancel(); + } + } } } catch (std::exception const& e) { std::cout << e.what() << std::endl; diff --git a/examples/sync_basic.cpp b/examples/sync_basic.cpp index e948785f..59752053 100644 --- a/examples/sync_basic.cpp +++ b/examples/sync_basic.cpp @@ -6,6 +6,7 @@ */ #include +#include namespace net = aedis::net; using namespace aedis; @@ -28,7 +29,7 @@ int main() std::string buffer; resp::response_flat_map hello; resp::read(socket, buffer, hello); - print(hello.result); + resp::print(hello.result); resp::response_simple_string set; resp::read(socket, buffer, set); diff --git a/include/aedis/command.hpp b/include/aedis/command.hpp new file mode 100644 index 00000000..4d187419 --- /dev/null +++ b/include/aedis/command.hpp @@ -0,0 +1,119 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +namespace aedis { namespace resp { + +enum class command +{ append +, auth +, bgrewriteaof +, bgsave +, bitcount +, client +, del +, exec +, expire +, flushall +, get // 10 +, hello +, hget +, hgetall +, hincrby +, hkeys +, hlen +, hmget +, hset +, hvals +, incr // 20 +, keys +, llen +, lpop +, lpush +, lrange +, ltrim +, multi +, ping +, psubscribe +, publish // 30 +, quit +, role +, rpush +, sadd +, scard +, sentinel +, set +, smembers +, subscribe +, unsubscribe // 40 +, zadd +, zrange +, zrangebyscore +, zremrangebyscore +, none +}; + +#define EXPAND_COMMAND_CASE(x) case command::x: return #x + +inline +auto const* to_string(command c) +{ + switch (c) { + EXPAND_COMMAND_CASE(append); + EXPAND_COMMAND_CASE(auth); + EXPAND_COMMAND_CASE(bgrewriteaof); + EXPAND_COMMAND_CASE(bgsave); + EXPAND_COMMAND_CASE(bitcount); + EXPAND_COMMAND_CASE(client); + EXPAND_COMMAND_CASE(del); + EXPAND_COMMAND_CASE(exec); + EXPAND_COMMAND_CASE(expire); + EXPAND_COMMAND_CASE(flushall); + EXPAND_COMMAND_CASE(get); + EXPAND_COMMAND_CASE(hello); + EXPAND_COMMAND_CASE(hget); + EXPAND_COMMAND_CASE(hgetall); + EXPAND_COMMAND_CASE(hincrby); + EXPAND_COMMAND_CASE(hkeys); + EXPAND_COMMAND_CASE(hlen); + EXPAND_COMMAND_CASE(hmget); + EXPAND_COMMAND_CASE(hset); + EXPAND_COMMAND_CASE(hvals); + EXPAND_COMMAND_CASE(incr); + EXPAND_COMMAND_CASE(keys); + EXPAND_COMMAND_CASE(llen); + EXPAND_COMMAND_CASE(lpop); + EXPAND_COMMAND_CASE(lpush); + EXPAND_COMMAND_CASE(lrange); + EXPAND_COMMAND_CASE(ltrim); + EXPAND_COMMAND_CASE(multi); + EXPAND_COMMAND_CASE(ping); + EXPAND_COMMAND_CASE(psubscribe); + EXPAND_COMMAND_CASE(publish); + EXPAND_COMMAND_CASE(quit); + EXPAND_COMMAND_CASE(role); + EXPAND_COMMAND_CASE(rpush); + EXPAND_COMMAND_CASE(sadd); + EXPAND_COMMAND_CASE(scard); + EXPAND_COMMAND_CASE(sentinel); + EXPAND_COMMAND_CASE(set); + EXPAND_COMMAND_CASE(smembers); + EXPAND_COMMAND_CASE(subscribe); + EXPAND_COMMAND_CASE(unsubscribe); + EXPAND_COMMAND_CASE(zadd); + EXPAND_COMMAND_CASE(zrange); + EXPAND_COMMAND_CASE(zrangebyscore); + EXPAND_COMMAND_CASE(zremrangebyscore); + EXPAND_COMMAND_CASE(none); + default: assert(false); + } +} + +} +} + diff --git a/include/aedis/parser.hpp b/include/aedis/parser.hpp new file mode 100644 index 00000000..245926f3 --- /dev/null +++ b/include/aedis/parser.hpp @@ -0,0 +1,224 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "type.hpp" + +namespace aedis { namespace resp { + +// Converts a decimal number in ascii format to an integer. +inline +long long length(char const* p) +{ + long long len = 0; + while (*p != '\r') { + len = (10 * len) + (*p - '0'); + p++; + } + return len; +} + +enum class bulk_type +{ blob_error +, verbatim_string +, blob_string +, streamed_string_part +, none +}; + +template +class parser { +public: +private: + Response* res_; + int depth_; + int sizes_[6]; // Streaming will require a bigger integer. + bulk_type bulk_; + int bulk_length_; + + void init(Response* res) + { + res_ = res; + depth_ = 0; + sizes_[0] = 2; + sizes_[1] = 1; + sizes_[2] = 1; + sizes_[3] = 1; + sizes_[4] = 1; + sizes_[5] = 1; + sizes_[6] = 1; + bulk_ = bulk_type::none; + bulk_length_ = std::numeric_limits::max(); + } + + auto on_array_impl(char const* data, int m = 1) + { + auto const l = length(data + 1); + if (l == 0) { + --sizes_[depth_]; + return l; + } + + auto const size = m * l; + sizes_[++depth_] = size; + return size; + } + + void on_array(char const* data) + { res_->select_array(on_array_impl(data, 1)); } + + void on_push(char const* data) + { res_->select_push(on_array_impl(data, 1)); } + + void on_set(char const* data) + { res_->select_set(on_array_impl(data, 1)); } + + void on_map(char const* data) + { res_->select_map(on_array_impl(data, 2)); } + + void on_attribute(char const* data) + { res_->select_attribute(on_array_impl(data, 2)); } + + void on_null() + { + res_->on_null(); + --sizes_[depth_]; + } + + auto handle_simple_string(char const* data, std::size_t n) + { + --sizes_[depth_]; + return std::string_view {data + 1, n - 3}; + } + + void on_simple_string(char const* data, std::size_t n) + { res_->on_simple_string(handle_simple_string(data, n)); } + + void on_simple_error(char const* data, std::size_t n) + { res_->on_simple_error(handle_simple_string(data, n)); } + + void on_number(char const* data, std::size_t n) + { res_->on_number(handle_simple_string(data, n)); } + + void on_double(char const* data, std::size_t n) + { res_->on_double(handle_simple_string(data, n)); } + + void on_boolean(char const* data, std::size_t n) + { res_->on_bool(handle_simple_string(data, n)); } + + void on_big_number(char const* data, std::size_t n) + { res_->on_big_number(handle_simple_string(data, n)); } + + void on_bulk(bulk_type b, std::string_view s = {}) + { + switch (b) { + case bulk_type::blob_error: res_->on_blob_error(s); break; + case bulk_type::verbatim_string: res_->on_verbatim_string(s); break; + case bulk_type::blob_string: res_->on_blob_string(s); break; + case bulk_type::streamed_string_part: + { + if (std::empty(s)) { + sizes_[depth_] = 1; + } else { + res_->on_streamed_string_part(s); + } + } break; + default: assert(false); + } + + --sizes_[depth_]; + } + + auto on_blob_error_impl(char const* data, bulk_type b) + { + bulk_length_ = length(data + 1); + return b; + } + + auto on_streamed_string_size(char const* data) + { return on_blob_error_impl(data, bulk_type::streamed_string_part); } + + auto on_blob_error(char const* data) + { return on_blob_error_impl(data, bulk_type::blob_error); } + + auto on_verbatim_string(char const* data) + { return on_blob_error_impl(data, bulk_type::verbatim_string); } + + auto on_blob_string(char const* data) + { + if (*(data + 1) == '?') { + sizes_[++depth_] = std::numeric_limits::max(); + return bulk_type::none; + } + + return on_blob_error_impl(data, bulk_type::blob_string); + } + +public: + parser(Response* res) + { init(res); } + + std::size_t advance(char const* data, std::size_t n) + { + auto next = bulk_type::none; + if (bulk_ != bulk_type::none) { + n = bulk_length_ + 2; + on_bulk(bulk_, {data, (std::size_t)bulk_length_}); + } else { + if (sizes_[depth_] != 0) { + switch (*data) { + case '!': next = on_blob_error(data); break; + case '=': next = on_verbatim_string(data); break; + case '$': next = on_blob_string(data); break; + case ';': next = on_streamed_string_size(data); break; + case '-': on_simple_error(data, n); break; + case ':': on_number(data, n); break; + case ',': on_double(data, n); break; + case '#': on_boolean(data, n); break; + case '(': on_big_number(data, n); break; + case '+': on_simple_string(data, n); break; + case '_': on_null(); break; + case '>': on_push(data); break; + case '~': on_set(data); break; + case '*': on_array(data); break; + case '|': on_attribute(data); break; + case '%': on_map(data); break; + default: assert(false); + } + } else { + } + } + + while (sizes_[depth_] == 0) { + res_->pop(); + --sizes_[--depth_]; + } + + bulk_ = next; + return n; + } + + auto done() const noexcept + { return depth_ == 0 && bulk_ == bulk_type::none; } + + auto bulk() const noexcept + { return bulk_; } + + auto bulk_length() const noexcept + { return bulk_length_; } +}; + +} // resp +} // aedis diff --git a/include/aedis/read.hpp b/include/aedis/read.hpp index e2c2d19e..b9b77d8f 100644 --- a/include/aedis/read.hpp +++ b/include/aedis/read.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -20,210 +20,11 @@ #include #include +#include "type.hpp" +#include "parser.hpp" + namespace aedis { namespace resp { -// Converts a decimal number in ascii format to an integer. -inline -long long length(char const* p) -{ - long long len = 0; - while (*p != '\r') { - len = (10 * len) + (*p - '0'); - p++; - } - return len; -} - -enum class bulk_type -{ blob_error -, verbatim_string -, blob_string -, streamed_string_part -, none -}; - -template -class parser { -public: -private: - Response* res_; - int depth_; - int sizes_[6]; // Streaming will require a bigger integer. - bulk_type bulk_; - int bulk_length_; - - void init(Response* res) - { - res_ = res; - depth_ = 0; - sizes_[0] = 2; - sizes_[1] = 1; - sizes_[2] = 1; - sizes_[3] = 1; - sizes_[4] = 1; - sizes_[5] = 1; - sizes_[6] = 1; - bulk_ = bulk_type::none; - bulk_length_ = std::numeric_limits::max(); - } - - auto on_array_impl(char const* data, int m = 1) - { - auto const l = length(data + 1); - if (l == 0) { - --sizes_[depth_]; - return l; - } - - auto const size = m * l; - sizes_[++depth_] = size; - return size; - } - - void on_array(char const* data) - { res_->select_array(on_array_impl(data, 1)); } - - void on_push(char const* data) - { res_->select_push(on_array_impl(data, 1)); } - - void on_set(char const* data) - { res_->select_set(on_array_impl(data, 1)); } - - void on_map(char const* data) - { res_->select_map(on_array_impl(data, 2)); } - - void on_attribute(char const* data) - { res_->select_attribute(on_array_impl(data, 2)); } - - void on_null() - { - res_->on_null(); - --sizes_[depth_]; - } - - auto handle_simple_string(char const* data, std::size_t n) - { - --sizes_[depth_]; - return std::string_view {data + 1, n - 3}; - } - - void on_simple_string(char const* data, std::size_t n) - { res_->on_simple_string(handle_simple_string(data, n)); } - - void on_simple_error(char const* data, std::size_t n) - { res_->on_simple_error(handle_simple_string(data, n)); } - - void on_number(char const* data, std::size_t n) - { res_->on_number(handle_simple_string(data, n)); } - - void on_double(char const* data, std::size_t n) - { res_->on_double(handle_simple_string(data, n)); } - - void on_boolean(char const* data, std::size_t n) - { res_->on_bool(handle_simple_string(data, n)); } - - void on_big_number(char const* data, std::size_t n) - { res_->on_big_number(handle_simple_string(data, n)); } - - void on_bulk(bulk_type b, std::string_view s = {}) - { - switch (b) { - case bulk_type::blob_error: res_->on_blob_error(s); break; - case bulk_type::verbatim_string: res_->on_verbatim_string(s); break; - case bulk_type::blob_string: res_->on_blob_string(s); break; - case bulk_type::streamed_string_part: - { - if (std::empty(s)) { - sizes_[depth_] = 1; - } else { - res_->on_streamed_string_part(s); - } - } break; - default: assert(false); - } - - --sizes_[depth_]; - } - - auto on_blob_error_impl(char const* data, bulk_type b) - { - bulk_length_ = length(data + 1); - return b; - } - - auto on_streamed_string_size(char const* data) - { return on_blob_error_impl(data, bulk_type::streamed_string_part); } - - auto on_blob_error(char const* data) - { return on_blob_error_impl(data, bulk_type::blob_error); } - - auto on_verbatim_string(char const* data) - { return on_blob_error_impl(data, bulk_type::verbatim_string); } - - auto on_blob_string(char const* data) - { - if (*(data + 1) == '?') { - sizes_[++depth_] = std::numeric_limits::max(); - return bulk_type::none; - } - - return on_blob_error_impl(data, bulk_type::blob_string); - } - -public: - parser(Response* res) - { init(res); } - - std::size_t advance(char const* data, std::size_t n) - { - auto next = bulk_type::none; - if (bulk_ != bulk_type::none) { - n = bulk_length_ + 2; - on_bulk(bulk_, {data, (std::size_t)bulk_length_}); - } else { - if (sizes_[depth_] != 0) { - switch (*data) { - case '!': next = on_blob_error(data); break; - case '=': next = on_verbatim_string(data); break; - case '$': next = on_blob_string(data); break; - case ';': next = on_streamed_string_size(data); break; - case '-': on_simple_error(data, n); break; - case ':': on_number(data, n); break; - case ',': on_double(data, n); break; - case '#': on_boolean(data, n); break; - case '(': on_big_number(data, n); break; - case '+': on_simple_string(data, n); break; - case '_': on_null(); break; - case '>': on_push(data); break; - case '~': on_set(data); break; - case '*': on_array(data); break; - case '|': on_attribute(data); break; - case '%': on_map(data); break; - default: assert(false); - } - } else { - } - } - - while (sizes_[depth_] == 0) { - res_->pop(); - --sizes_[--depth_]; - } - - bulk_ = next; - return n; - } - - auto done() const noexcept - { return depth_ == 0 && bulk_ == bulk_type::none; } - - auto bulk() const noexcept - { return bulk_; } - - auto bulk_length() const noexcept - { return bulk_length_; } -}; - inline void print_command_raw(std::string const& data, int n) { @@ -394,77 +195,6 @@ auto async_read( stream); } -enum class type -{ array -, push -, set -, map -, attribute -, simple_string -, simple_error -, number -, double_type -, boolean -, big_number -, null -, blob_error -, verbatim_string -, blob_string -, streamed_string_part -, invalid -}; - -#define EXPAND_TYPE_CASE(x) case type::x: return #x - -inline -auto to_string(type t) -{ - switch (t) { - EXPAND_TYPE_CASE(array); - EXPAND_TYPE_CASE(push); - EXPAND_TYPE_CASE(set); - EXPAND_TYPE_CASE(map); - EXPAND_TYPE_CASE(attribute); - EXPAND_TYPE_CASE(simple_string); - EXPAND_TYPE_CASE(simple_error); - EXPAND_TYPE_CASE(number); - EXPAND_TYPE_CASE(double_type); - EXPAND_TYPE_CASE(boolean); - EXPAND_TYPE_CASE(big_number); - EXPAND_TYPE_CASE(null); - EXPAND_TYPE_CASE(blob_error); - EXPAND_TYPE_CASE(verbatim_string); - EXPAND_TYPE_CASE(blob_string); - EXPAND_TYPE_CASE(streamed_string_part); - EXPAND_TYPE_CASE(invalid); - default: assert(false); - } -} - -inline -auto to_type(char c) -{ - switch (c) { - case '!': return type::blob_error; - case '=': return type::verbatim_string; - case '$': return type::blob_string; - case ';': return type::streamed_string_part; - case '-': return type::simple_error; - case ':': return type::number; - case ',': return type::double_type; - case '#': return type::boolean; - case '(': return type::big_number; - case '+': return type::simple_string; - case '_': return type::null; - case '>': return type::push; - case '~': return type::set; - case '*': return type::array; - case '|': return type::attribute; - case '%': return type::map; - default: return type::invalid; - } -} - template < class AsyncReadStream, class Storage> diff --git a/include/aedis/request.hpp b/include/aedis/request.hpp index 8db1de91..a8542ca0 100644 --- a/include/aedis/request.hpp +++ b/include/aedis/request.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -18,6 +18,8 @@ #include #include +#include "command.hpp" + namespace aedis { namespace resp { inline @@ -131,111 +133,6 @@ void assemble(std::string& ret, std::string_view cmd, std::string_view key) assemble(ret, cmd, {key}, std::cbegin(dummy), std::cend(dummy)); } -enum class command -{ append -, auth -, bgrewriteaof -, bgsave -, bitcount -, client -, del -, exec -, expire -, flushall -, get // 10 -, hello -, hget -, hgetall -, hincrby -, hkeys -, hlen -, hmget -, hset -, hvals -, incr // 20 -, keys -, llen -, lpop -, lpush -, lrange -, ltrim -, multi -, ping -, psubscribe -, publish // 30 -, quit -, role -, rpush -, sadd -, scard -, sentinel -, set -, smembers -, subscribe -, unsubscribe // 40 -, zadd -, zrange -, zrangebyscore -, zremrangebyscore -, none -}; - -#define EXPAND_COMMAND_CASE(x) case command::x: return #x - -inline -auto const* to_string(command c) -{ - switch (c) { - EXPAND_COMMAND_CASE(append); - EXPAND_COMMAND_CASE(auth); - EXPAND_COMMAND_CASE(bgrewriteaof); - EXPAND_COMMAND_CASE(bgsave); - EXPAND_COMMAND_CASE(bitcount); - EXPAND_COMMAND_CASE(client); - EXPAND_COMMAND_CASE(del); - EXPAND_COMMAND_CASE(exec); - EXPAND_COMMAND_CASE(expire); - EXPAND_COMMAND_CASE(flushall); - EXPAND_COMMAND_CASE(get); - EXPAND_COMMAND_CASE(hello); - EXPAND_COMMAND_CASE(hget); - EXPAND_COMMAND_CASE(hgetall); - EXPAND_COMMAND_CASE(hincrby); - EXPAND_COMMAND_CASE(hkeys); - EXPAND_COMMAND_CASE(hlen); - EXPAND_COMMAND_CASE(hmget); - EXPAND_COMMAND_CASE(hset); - EXPAND_COMMAND_CASE(hvals); - EXPAND_COMMAND_CASE(incr); - EXPAND_COMMAND_CASE(keys); - EXPAND_COMMAND_CASE(llen); - EXPAND_COMMAND_CASE(lpop); - EXPAND_COMMAND_CASE(lpush); - EXPAND_COMMAND_CASE(lrange); - EXPAND_COMMAND_CASE(ltrim); - EXPAND_COMMAND_CASE(multi); - EXPAND_COMMAND_CASE(ping); - EXPAND_COMMAND_CASE(psubscribe); - EXPAND_COMMAND_CASE(publish); - EXPAND_COMMAND_CASE(quit); - EXPAND_COMMAND_CASE(role); - EXPAND_COMMAND_CASE(rpush); - EXPAND_COMMAND_CASE(sadd); - EXPAND_COMMAND_CASE(scard); - EXPAND_COMMAND_CASE(sentinel); - EXPAND_COMMAND_CASE(set); - EXPAND_COMMAND_CASE(smembers); - EXPAND_COMMAND_CASE(subscribe); - EXPAND_COMMAND_CASE(unsubscribe); - EXPAND_COMMAND_CASE(zadd); - EXPAND_COMMAND_CASE(zrange); - EXPAND_COMMAND_CASE(zrangebyscore); - EXPAND_COMMAND_CASE(zremrangebyscore); - EXPAND_COMMAND_CASE(none); - default: assert(false); - } -} - enum class event {ignore}; // TODO: Make the write functions friend of this class and make the diff --git a/include/aedis/response.hpp b/include/aedis/response.hpp index 0ca58f03..d640e017 100644 --- a/include/aedis/response.hpp +++ b/include/aedis/response.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this @@ -17,22 +17,7 @@ #include #include -template -void print(Iter begin, Iter end, char const* p) -{ - std::cout << p << ": "; - for (; begin != end; ++begin) - std::cout << *begin << " "; - std::cout << std::endl; -} - -template -void print(Range const& v, char const* p = "") -{ - using std::cbegin; - using std::cend; - print(cbegin(v), cend(v), p); -} +#include "type.hpp" namespace aedis { namespace resp { @@ -49,7 +34,7 @@ inline void from_string_view(std::string_view s, std::string& r) { r = s; } -// The interface required from all response types. +// The interface required from from the parser. struct response_ignore { void pop() {} void select_array(int n) {} @@ -74,67 +59,77 @@ struct response_ignore { // To receive transactions with responses the are not recursive // themselves. -class response_depth1 { +class response_transaction { +public: + struct elem { + int depth; + type t; + std::vector value; + }; + private: - int i_ = 0; - std::vector> resps_; - void add_element(int n) + int depth_ = 0; + std::vector resps_; + + void add_element(int n, type t) { - ++i_; - if (i_ == 2) - resps_.push_back({}); + if (depth_ == 0) { + resps_.reserve(n); + ++depth_; + return; + } + + assert(depth_ == 1); + resps_.emplace_back(depth_, t); + resps_.back().value.reserve(n); + ++depth_; } - void add(std::string_view s) { resps_.back().push_back(std::string{s}); } + void add(std::string_view s, type t) + { + assert(!std::empty(resps_)); + if (depth_ == 1) { + resps_.emplace_back(depth_, t, std::vector{std::string{s}}); + } else { + resps_.back().value.push_back(std::string{s}); + } + } public: void clear() - { resps_.clear(); i_ = 0;} + { resps_.clear(); depth_ = 0;} auto size() const { return resps_.size(); } auto& at(int i) { return resps_.at(i); } auto const& at(int i) const { return resps_.at(i); } - void pop() - { - --i_; - } + void pop() { --depth_; } - void select_array(int n) {add_element(n);} - void select_push(int n) {add_element(n);} - void select_set(int n) {add_element(n);} - void select_map(int n) {add_element(n);} - void select_attribute(int n) {add_element(n);} + void select_array(int n) {add_element(n, type::array);} + void select_push(int n) {add_element(n, type::push);} + void select_set(int n) {add_element(n, type::set);} + void select_map(int n) {add_element(n, type::map);} + void select_attribute(int n) {add_element(n, type::attribute);} - void on_simple_string(std::string_view s) {add(s);} - void on_simple_error(std::string_view s) {add(s);} - void on_number(std::string_view s) {add(s);} - void on_double(std::string_view s) {add(s);} - void on_bool(std::string_view s) {add(s);} - void on_big_number(std::string_view s) {add(s);} - void on_null() {add({});} - void on_blob_error(std::string_view s = {}) {add(s);} - void on_verbatim_string(std::string_view s = {}) {add(s);} - void on_blob_string(std::string_view s = {}) {add(s);} - void on_streamed_string_part(std::string_view s = {}) {add(s);} -}; - -enum class aggregate_type -{ attribute -, push -, array -, set -, map -, none + void on_simple_string(std::string_view s) { add(s, type::simple_string); } + void on_simple_error(std::string_view s) { add(s, type::simple_error); } + void on_number(std::string_view s) {add(s, type::number);} + void on_double(std::string_view s) {add(s, type::double_type);} + void on_bool(std::string_view s) {add(s, type::boolean);} + void on_big_number(std::string_view s) {add(s, type::big_number);} + void on_null() {add({}, type::null);} + void on_blob_error(std::string_view s = {}) {add(s, type::blob_error);} + void on_verbatim_string(std::string_view s = {}) {add(s, type::verbatim_string);} + void on_blob_string(std::string_view s = {}) {add(s, type::blob_string);} + void on_streamed_string_part(std::string_view s = {}) {add(s, type::streamed_string_part);} }; +// A base class for flat responses which means response with no +// embedded types in themselves. For exaple, a transaction with an +// lrange in it will produce a response that is an array with an +// array. That is not suitable for this class. class response_base { -private: - // TODO: Use the type enum in read.hpp and a static_array. The - // size of the array must be the same as that of the parser. - std::vector aggregates_; - protected: virtual void on_simple_string_impl(std::string_view s) { throw std::runtime_error("on_simple_string_impl: Has not been overridden."); } @@ -168,24 +163,67 @@ protected: { throw std::runtime_error("select_push_impl: Has not been overridden."); } public: - void pop() {aggregates_.pop_back();} - void select_attribute(int n) { aggregates_.push_back(aggregate_type::attribute); } - void select_push(int n) { aggregates_.push_back(aggregate_type::push); } - void select_array(int n) { aggregates_.push_back(aggregate_type::array); } - void select_set(int n) { aggregates_.push_back(aggregate_type::set); } - void select_map(int n) { aggregates_.push_back(aggregate_type::map); } + void pop() {} + void select_attribute(int n) + { + } + void select_push(int n) + { + } + void select_array(int n) + { + } + void select_set(int n) + { + } + void select_map(int n) + { + } + void on_simple_error(std::string_view s) + { + on_simple_error_impl(s); + } + void on_blob_error(std::string_view s = {}) + { + on_blob_error_impl(s); + } + void on_null() + { + on_null_impl(); + } + void on_simple_string(std::string_view s) + { + on_simple_string_impl(s); + } + void on_number(std::string_view s) + { + on_number_impl(s); + } + void on_double(std::string_view s) + { + on_double_impl(s); + } + void on_bool(std::string_view s) + { + on_bool_impl(s); + } + void on_big_number(std::string_view s) + { + on_big_number_impl(s); + } + void on_verbatim_string(std::string_view s = {}) + { + on_verbatim_string_impl(s); + } + void on_blob_string(std::string_view s = {}) + { + on_blob_string_impl(s); + } + void on_streamed_string_part(std::string_view s = {}) + { + on_streamed_string_part_impl(s); + } - void on_simple_error(std::string_view s) { on_simple_error_impl(s); } - void on_blob_error(std::string_view s = {}) { on_blob_error_impl(s); } - void on_null() {on_null_impl(); } - void on_simple_string(std::string_view s) { on_simple_string_impl(s); } - void on_number(std::string_view s) { on_number_impl(s); } - void on_double(std::string_view s) { on_double_impl(s); } - void on_bool(std::string_view s) { on_bool_impl(s); } - void on_big_number(std::string_view s) { on_big_number_impl(s); } - void on_verbatim_string(std::string_view s = {}) { on_verbatim_string_impl(s); } - void on_blob_string(std::string_view s = {}) { on_blob_string_impl(s); } - void on_streamed_string_part(std::string_view s = {}) { on_streamed_string_part_impl(s); } virtual ~response_base() {} }; @@ -441,17 +479,5 @@ public: std::array result; }; -struct responses { - response_flat_map push; - response_simple_string simple_string; - response_blob_string blob_string; - response_flat_map map; - response_array array; - response_set set; - response_number number; - response_depth1 depth1; - std::queue trans; -}; - } // resp } // aedis diff --git a/include/aedis/type.hpp b/include/aedis/type.hpp new file mode 100644 index 00000000..64d64cad --- /dev/null +++ b/include/aedis/type.hpp @@ -0,0 +1,84 @@ +/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +namespace aedis { namespace resp { + +enum class type +{ array +, push +, set +, map +, attribute +, simple_string +, simple_error +, number +, double_type +, boolean +, big_number +, null +, blob_error +, verbatim_string +, blob_string +, streamed_string_part +, invalid +}; + +#define EXPAND_TYPE_CASE(x) case type::x: return #x + +inline +auto to_string(type t) +{ + switch (t) { + EXPAND_TYPE_CASE(array); + EXPAND_TYPE_CASE(push); + EXPAND_TYPE_CASE(set); + EXPAND_TYPE_CASE(map); + EXPAND_TYPE_CASE(attribute); + EXPAND_TYPE_CASE(simple_string); + EXPAND_TYPE_CASE(simple_error); + EXPAND_TYPE_CASE(number); + EXPAND_TYPE_CASE(double_type); + EXPAND_TYPE_CASE(boolean); + EXPAND_TYPE_CASE(big_number); + EXPAND_TYPE_CASE(null); + EXPAND_TYPE_CASE(blob_error); + EXPAND_TYPE_CASE(verbatim_string); + EXPAND_TYPE_CASE(blob_string); + EXPAND_TYPE_CASE(streamed_string_part); + EXPAND_TYPE_CASE(invalid); + default: assert(false); + } +} + +inline +auto to_type(char c) +{ + switch (c) { + case '!': return type::blob_error; + case '=': return type::verbatim_string; + case '$': return type::blob_string; + case ';': return type::streamed_string_part; + case '-': return type::simple_error; + case ':': return type::number; + case ',': return type::double_type; + case '#': return type::boolean; + case '(': return type::big_number; + case '+': return type::simple_string; + case '_': return type::null; + case '>': return type::push; + case '~': return type::set; + case '*': return type::array; + case '|': return type::attribute; + case '%': return type::map; + default: return type::invalid; + } +} + +} // resp +} // aedis diff --git a/include/aedis/utils.hpp b/include/aedis/utils.hpp new file mode 100644 index 00000000..0242dec0 --- /dev/null +++ b/include/aedis/utils.hpp @@ -0,0 +1,32 @@ +/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include + +namespace aedis { namespace resp { + +template +void print(Iter begin, Iter end, char const* p) +{ + std::cout << p << ": "; + for (; begin != end; ++begin) + std::cout << *begin << " "; + std::cout << std::endl; +} + +template +void print(Range const& v, char const* p = "") +{ + using std::cbegin; + using std::cend; + print(cbegin(v), cend(v), p); +} + +} // resp +} // aedis