diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 2315a979..67005c82 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -9,6 +9,16 @@ using namespace aedis; +void print_event(resp3::type t, std::pair const& p) +{ + std::cout << "Event: " << p.first << "."; + + if (!std::empty(p.second)) + std::cout << " Key: " << p.second << "."; + + std::cout << " Type: " << t << std::endl; +} + net::awaitable example(net::ip::tcp::socket& socket, std::queue& requests) @@ -20,17 +30,17 @@ example(net::ip::tcp::socket& socket, resp3::consumer cs; for (;;) { - auto const type = co_await cs.async_consume(socket, requests, resp, net::use_awaitable); + auto const t = co_await cs.async_consume(socket, requests, resp, net::use_awaitable); - if (type == resp3::type::flat_push) { - std::cout << "Event: " << "(" << type << ")" << std::endl; + if (t == resp3::type::flat_push) { + std::cout << "Event: " << "(" << t << ")" << std::endl; continue; } - auto const cmd = requests.front().commands.front(); + auto const id = requests.front().ids.front(); - std::cout << "Event: " << cmd << " (" << type << ")" << std::endl; - switch (cmd) { + print_event(t, id); + switch (id.first) { case command::hello: { prepare_next(requests); diff --git a/include/aedis/command.hpp b/include/aedis/command.hpp index 202ebe79..e6b3c9de 100644 --- a/include/aedis/command.hpp +++ b/include/aedis/command.hpp @@ -61,6 +61,7 @@ enum class command , rpush , sadd , scard +, sdiff , sentinel , set , smembers diff --git a/include/aedis/resp3/impl/response.ipp b/include/aedis/resp3/impl/response.ipp index 26665164..a25c4828 100644 --- a/include/aedis/resp3/impl/response.ipp +++ b/include/aedis/resp3/impl/response.ipp @@ -10,7 +10,8 @@ namespace aedis { namespace resp3 { -response_adapter_base* response::select_adapter(type type, command cmd) +response_adapter_base* +response::select_adapter(type type, command cmd, std::string const&) { if (type == type::flat_push) return &flat_push_adapter_; diff --git a/include/aedis/resp3/read.hpp b/include/aedis/resp3/read.hpp index 6bdec3c6..b9dac1d4 100644 --- a/include/aedis/resp3/read.hpp +++ b/include/aedis/resp3/read.hpp @@ -260,12 +260,14 @@ struct consumer_op { yield { - auto cmd = command::unknown; - if (m_type != type::flat_push) - cmd = requests.front().commands.front(); - - auto* adapter = resp.select_adapter(m_type, cmd); - async_read_one(socket, buffer, *adapter, std::move(self)); + if (m_type == type::flat_push) { + auto* adapter = resp.select_adapter(m_type, command::unknown, {}); + async_read_one(socket, buffer, *adapter, std::move(self)); + } else { + auto const& pair = requests.front().ids.front(); + auto* adapter = resp.select_adapter(m_type, pair.first, pair.second); + async_read_one(socket, buffer, *adapter, std::move(self)); + } } if (ec) { @@ -276,9 +278,9 @@ struct consumer_op { yield self.complete(ec, m_type); if (m_type != type::flat_push) - requests.front().commands.pop(); + requests.front().ids.pop(); - } while (!std::empty(requests.front().commands)); + } while (!std::empty(requests.front().ids)); requests.pop(); } while (std::empty(requests)); } diff --git a/include/aedis/resp3/request.hpp b/include/aedis/resp3/request.hpp index 5e91c15c..8afacb96 100644 --- a/include/aedis/resp3/request.hpp +++ b/include/aedis/resp3/request.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -117,14 +118,14 @@ void assemble(std::string& ret, std::string_view cmd, std::string_view key); class request { public: std::string payload; - std::queue commands; + std::queue> ids; bool sent = false; public: /// Return the size of the pipeline. i.e. how many commands it - /// contians. + /// contains. auto size() const noexcept - { return std::size(commands); } + { return std::size(ids); } auto payload_size() const noexcept { return std::size(payload); } @@ -136,72 +137,72 @@ public: void clear() { payload.clear(); - commands = {}; + ids = {}; } void ping() { assemble(payload, "PING"); - commands.push(command::ping); + ids.push(std::make_pair(command::ping, std::string{})); } void quit() { assemble(payload, "QUIT"); - commands.push(command::quit); + ids.push(std::make_pair(command::quit, std::string{})); } void multi() { assemble(payload, "MULTI"); - commands.push(command::multi); + ids.push(std::make_pair(command::multi, std::string{})); } void exec() { assemble(payload, "EXEC"); - commands.push(command::exec); + ids.push(std::make_pair(command::exec, std::string{})); } void incr(std::string_view key) { assemble(payload, "INCR", key); - commands.push(command::incr); + ids.push(std::make_pair(command::incr, std::string{key})); } /// Adds auth to the request, see https://redis.io/commands/bgrewriteaof void auth(std::string_view pwd) { assemble(payload, "AUTH", pwd); - commands.push(command::auth); + ids.push(std::make_pair(command::auth, std::string{})); } /// Adds bgrewriteaof to the request, see https://redis.io/commands/bgrewriteaof void bgrewriteaof() { assemble(payload, "BGREWRITEAOF"); - commands.push(command::bgrewriteaof); + ids.push(std::make_pair(command::bgrewriteaof, std::string{})); } /// Adds role to the request, see https://redis.io/commands/role void role() { assemble(payload, "ROLE"); - commands.push(command::role); + ids.push(std::make_pair(command::role, std::string{})); } /// Adds bgsave to the request, see //https://redis.io/commands/bgsave void bgsave() { assemble(payload, "BGSAVE"); - commands.push(command::bgsave); + ids.push(std::make_pair(command::bgsave, std::string{})); } /// Adds ping to the request, see https://redis.io/commands/flushall void flushall() { assemble(payload, "FLUSHALL"); - commands.push(command::flushall); + ids.push(std::make_pair(command::flushall, std::string{})); } /// Adds ping to the request, see https://redis.io/commands/lpop @@ -219,7 +220,7 @@ public: // std::cend(par)); //} - commands.push(command::lpop); + ids.push(std::make_pair(command::lpop, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/subscribe @@ -240,21 +241,21 @@ public: void get(std::string_view key) { assemble(payload, "GET", key); - commands.push(command::get); + ids.push(std::make_pair(command::get, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/keys void keys(std::string_view pattern) { assemble(payload, "KEYS", pattern); - commands.push(command::keys); + ids.push(std::make_pair(command::keys, std::string{})); } /// Adds ping to the request, see https://redis.io/commands/hello void hello(std::string_view version = "3") { assemble(payload, "HELLO", version); - commands.push(command::hello); + ids.push(std::make_pair(command::hello, std::string{})); } /// Adds ping to the request, see https://redis.io/commands/sentinel @@ -262,7 +263,7 @@ public: { auto par = {name}; assemble(payload, "SENTINEL", {arg}, std::cbegin(par), std::cend(par)); - commands.push(command::sentinel); + ids.push(std::make_pair(command::sentinel, std::string{})); } /// Adds ping to the request, see https://redis.io/commands/append @@ -270,7 +271,7 @@ public: { auto par = {msg}; assemble(payload, "APPEND", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::append); + ids.push(std::make_pair(command::append, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/bitcount @@ -285,7 +286,7 @@ public: , {key} , std::cbegin(par) , std::cend(par)); - commands.push(command::bitcount); + ids.push(std::make_pair(command::bitcount, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/rpush @@ -293,7 +294,7 @@ public: void rpush(std::string_view key, Iter begin, Iter end) { assemble(payload, "RPUSH", {key}, begin, end); - commands.push(command::rpush); + ids.push(std::make_pair(command::rpush, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/rpush @@ -317,7 +318,7 @@ public: void lpush(std::string_view key, Iter begin, Iter end) { assemble(payload, "LPUSH", {key}, begin, end); - commands.push(command::lpush); + ids.push(std::make_pair(command::lpush, std::string{key})); } void psubscribe( std::initializer_list l) @@ -331,7 +332,7 @@ public: { auto par = {msg}; assemble(payload, "PUBLISH", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::publish); + ids.push(std::make_pair(command::publish, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/set @@ -339,7 +340,7 @@ public: std::initializer_list args) { assemble(payload, "SET", {key}, std::cbegin(args), std::cend(args)); - commands.push(command::set); + ids.push(std::make_pair(command::set, std::string{key})); } // TODO: Find a way to assert the value type is a pair. @@ -352,7 +353,7 @@ public: using std::cbegin; using std::cend; assemble(payload, "HSET", {key}, std::cbegin(r), std::cend(r), 2); - commands.push(command::hset); + ids.push(std::make_pair(command::hset, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hincrby @@ -361,7 +362,7 @@ public: auto by_str = std::to_string(by); std::initializer_list par {field, by_str}; assemble(payload, "HINCRBY", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::hincrby); + ids.push(std::make_pair(command::hincrby, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hkeys @@ -369,28 +370,28 @@ public: { auto par = {""}; assemble(payload, "HKEYS", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::hkeys); + ids.push(std::make_pair(command::hkeys, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hlen void hlen(std::string_view key) { assemble(payload, "HLEN", {key}); - commands.push(command::hlen); + ids.push(std::make_pair(command::hlen, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hgetall void hgetall(std::string_view key) { assemble(payload, "HGETALL", {key}); - commands.push(command::hgetall); + ids.push(std::make_pair(command::hgetall, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hvals void hvals( std::string_view key) { assemble(payload, "HVALS", {key}); - commands.push(command::hvals); + ids.push(std::make_pair(command::hvals, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hget @@ -398,7 +399,7 @@ public: { auto par = {field}; assemble(payload, "HGET", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::hget); + ids.push(std::make_pair(command::hget, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hmget @@ -412,7 +413,7 @@ public: , std::cbegin(fields) , std::cend(fields)); - commands.push(command::hmget); + ids.push(std::make_pair(command::hmget, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/hdel @@ -427,7 +428,7 @@ public: std::cbegin(fields), std::cend(fields)); - commands.push(command::hdel); + ids.push(std::make_pair(command::hdel, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/expire @@ -436,7 +437,7 @@ public: auto const str = std::to_string(secs); std::initializer_list par {str}; assemble(payload, "EXPIRE", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::expire); + ids.push(std::make_pair(command::expire, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/zadd @@ -445,15 +446,15 @@ public: auto const score_str = std::to_string(score); std::initializer_list par = {score_str, value}; assemble(payload, "ZADD", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::zadd); + ids.push(std::make_pair(command::zadd, std::string{key})); } - /// Adds ping to the request, see https://redis.io/commands/zadd + /// Adds zadd to the request, see https://redis.io/commands/zadd template void zadd(std::initializer_list key, Range const& r) { assemble(payload, "ZADD", key, std::cbegin(r), std::cend(r), 2); - commands.push(command::zadd); + ids.push(std::make_pair(command::zadd, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/zrange @@ -464,7 +465,7 @@ public: std::initializer_list par {min_str, max_str}; assemble(payload, "ZRANGE", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::zrange); + ids.push(std::make_pair(command::zrange, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/zrangebyscore @@ -477,7 +478,7 @@ public: auto const min_str = std::to_string(min); auto par = {min_str , max_str}; assemble(payload, "ZRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::zrangebyscore); + ids.push(std::make_pair(command::zrangebyscore, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/zremrangebyscore @@ -489,7 +490,7 @@ public: { auto par = {min, max}; assemble(payload, "ZREMRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::zremrangebyscore); + ids.push(std::make_pair(command::zremrangebyscore, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/lrange @@ -499,7 +500,7 @@ public: auto const max_str = std::to_string(max); std::initializer_list par {min_str, max_str}; assemble(payload, "LRANGE", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::lrange); + ids.push(std::make_pair(command::lrange, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/ltrim @@ -509,7 +510,7 @@ public: auto const max_str = std::to_string(max); std::initializer_list par {min_str, max_str}; assemble(payload, "LTRIM", {key}, std::cbegin(par), std::cend(par)); - commands.push(command::ltrim); + ids.push(std::make_pair(command::ltrim, std::string{key})); } // TODO: Overload for vector del. @@ -517,14 +518,14 @@ public: void del(std::string_view key) { assemble(payload, "DEL", key); - commands.push(command::del); + ids.push(std::make_pair(command::del, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/llen void llen(std::string_view key) { assemble(payload, "LLEN", key); - commands.push(command::llen); + ids.push(std::make_pair(command::llen, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/sadd @@ -532,7 +533,7 @@ public: void sadd(std::string_view key, Iter begin, Iter end) { assemble(payload, "SADD", {key}, begin, end); - commands.push(command::sadd); + ids.push(std::make_pair(command::sadd, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/sadd @@ -548,28 +549,28 @@ public: void smembers(std::string_view key) { assemble(payload, "SMEMBERS", key); - commands.push(command::smembers); + ids.push(std::make_pair(command::smembers, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/scard void scard(std::string_view key) { assemble(payload, "SCARD", key); - commands.push(command::scard); + ids.push(std::make_pair(command::scard, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/scard void scard(std::string_view key, std::initializer_list l) { assemble(payload, "SDIFF", {key}, std::cbegin(l), std::cend(l)); - commands.push(command::scard); + ids.push(std::make_pair(command::sdiff, std::string{key})); } /// Adds ping to the request, see https://redis.io/commands/client_id void client_id(std::string_view parameters) { assemble(payload, "CLIENT ID", {parameters}); - commands.push(command::client_id); + ids.push(std::make_pair(command::client_id, std::string{})); } }; diff --git a/include/aedis/resp3/response.hpp b/include/aedis/resp3/response.hpp index 2892bce0..c0f860ba 100644 --- a/include/aedis/resp3/response.hpp +++ b/include/aedis/resp3/response.hpp @@ -81,7 +81,8 @@ private: detail::ignore_adapter ignore_adapter_; public: - response_adapter_base* select_adapter(resp3::type type, command cmd); + response_adapter_base* + select_adapter(resp3::type type, command cmd, std::string const& key); auto const& array() const noexcept {return array_;} auto& array() noexcept {return array_;} diff --git a/include/aedis/resp3/write.hpp b/include/aedis/resp3/write.hpp index 440ae979..db1dac83 100644 --- a/include/aedis/resp3/write.hpp +++ b/include/aedis/resp3/write.hpp @@ -78,13 +78,13 @@ struct write_some_op { requests.front().sent = true; - if (std::empty(requests.front().commands)) { + if (std::empty(requests.front().ids)) { // We only pop when all commands in the pipeline has push // responses like subscribe, otherwise, pop is done when the // response arrives. requests.pop(); } - } while (!std::empty(requests) && std::empty(requests.front().commands)); + } while (!std::empty(requests) && std::empty(requests.front().ids)); self.complete(ec); } diff --git a/tests/general.cpp b/tests/general.cpp index 73f2195e..736dce00 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -138,9 +138,9 @@ test_general(net::ip::tcp::resolver::results_type const& res) continue; } - auto const cmd = requests.front().commands.front(); + auto const id = requests.front().ids.front(); - switch (cmd) { + switch (id.first) { case command::hello: { prepare_next(requests); @@ -196,7 +196,7 @@ test_general(net::ip::tcp::resolver::results_type const& res) } break; case command::hgetall: check_equal(resp.flat_map(), {"field1", "value1", "field2", "value2"}, "hgetall (value)"); break; case command::smembers: check_equal(resp.flat_set(), {"1", "2", "3"}, "smembers (value)"); break; - default: { std::cout << "Error: " << type << " " << cmd << std::endl; } + default: { std::cout << "Error: " << type << " " << id.first << std::endl; } } resp.blob_string().clear(); @@ -703,7 +703,7 @@ net::awaitable test_streamed_string() std::string cmd {"$?\r\n;0\r\n"}; test_tcp_socket ts {cmd}; resp3::response resp; - auto* adapter = resp.select_adapter(resp3::type::streamed_string_part, command::unknown); + auto* adapter = resp.select_adapter(resp3::type::streamed_string_part, command::unknown, {}); co_await resp3::async_read_one(ts, buf, *adapter); check_equal(resp.streamed_string_part(), {}, "streamed string (empty)"); }