diff --git a/Makefile.am b/Makefile.am index 8aa36521..ea3856b4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -17,10 +17,10 @@ check_PROGRAMS += serialization_sync check_PROGRAMS += intro_high_level check_PROGRAMS += aggregates_high_level check_PROGRAMS += test_low_level +check_PROGRAMS += test_high_level if HAVE_CXX20 check_PROGRAMS += transaction check_PROGRAMS += custom_adapter -check_PROGRAMS += test_high_level endif EXTRA_PROGRAMS = @@ -45,8 +45,8 @@ serialization_sync_SOURCES = $(top_srcdir)/examples/serialization_sync.cpp commands_SOURCES = $(top_srcdir)/tools/commands.cpp subscriber_high_level_SOURCES = $(top_srcdir)/examples/subscriber_high_level.cpp test_low_level_SOURCES = $(top_srcdir)/tests/low_level.cpp -if HAVE_CXX20 test_high_level_SOURCES = $(top_srcdir)/tests/high_level.cpp +if HAVE_CXX20 transaction_SOURCES = $(top_srcdir)/examples/transaction.cpp subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp custom_adapter_SOURCES = $(top_srcdir)/examples/custom_adapter.cpp diff --git a/aedis/generic/client.hpp b/aedis/generic/client.hpp index eca6bf67..a984d17c 100644 --- a/aedis/generic/client.hpp +++ b/aedis/generic/client.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -21,10 +22,6 @@ #include #include -// TODO: What to do if users send a discard command not contained in a -// transaction. The client object will try to pop the queue until a -// multi is found. - namespace aedis { namespace generic { @@ -115,10 +112,11 @@ public: sr.push(cmd, args...); auto const after = requests_.size(); assert(after - before != 0); - info_.back().size += after - before;; + auto const d = after - before; + info_.back().size += d;; if (!has_push_response(cmd)) { - commands_.push_back(cmd); + commands_.push_back(std::make_pair(cmd, d)); ++info_.back().cmds; } @@ -151,10 +149,11 @@ public: sr.push_range2(cmd, key, begin, end); auto const after = requests_.size(); assert(after - before != 0); - info_.back().size += after - before;; + auto const d = after - before; + info_.back().size += d; if (!has_push_response(cmd)) { - commands_.push_back(cmd); + commands_.push_back(std::make_pair(cmd, d)); ++info_.back().cmds; } @@ -186,10 +185,11 @@ public: sr.push_range2(cmd, begin, end); auto const after = requests_.size(); assert(after - before != 0); - info_.back().size += after - before;; + auto const d = after - before; + info_.back().size += d; if (!has_push_response(cmd)) { - commands_.push_back(cmd); + commands_.push_back(std::make_pair(cmd, d)); ++info_.back().cmds; } @@ -359,7 +359,7 @@ private: ++info_.front().cmds; // Push front. - commands_.push_back(Command::hello); + commands_.push_back(std::make_pair(Command::hello, hello_size)); std::rotate( std::begin(commands_), std::prev(std::end(commands_)), @@ -396,9 +396,6 @@ private: // Returns true when the next request can be writen. bool on_cmd(Command cmd) { - // TODO: If the response to a discard is received we have to - // remove all commands up until multi. - assert(!info_.empty()); assert(!commands_.empty()); @@ -506,7 +503,7 @@ private: std::string requests_; // The commands contained in the requests. - std::vector commands_; + std::vector> commands_; // Info about the requests. std::vector info_; diff --git a/aedis/generic/detail/client_ops.hpp b/aedis/generic/detail/client_ops.hpp index 1f1c91c1..346c9662 100644 --- a/aedis/generic/detail/client_ops.hpp +++ b/aedis/generic/detail/client_ops.hpp @@ -371,7 +371,7 @@ struct reader_op { cli->cmd = Command::invalid; if (cli->data_type != resp3::type::push) { assert(!cli->commands_.empty()); - cli->cmd = cli->commands_.front(); + cli->cmd = cli->commands_.front().first; } yield cli->async_read(std::move(self)); diff --git a/aedis/redis/command.hpp b/aedis/redis/command.hpp index 6b03dcaa..90723fc4 100644 --- a/aedis/redis/command.hpp +++ b/aedis/redis/command.hpp @@ -72,7 +72,7 @@ enum class command { decrby, /// https://redis.io/commands/del del, - /// https://redis.io/commands/discard (not supported yet) + /// https://redis.io/commands/discard discard, /// https://redis.io/commands/dump dump, diff --git a/tests/high_level.cpp b/tests/high_level.cpp index 720aa277..ec2aa7ea 100644 --- a/tests/high_level.cpp +++ b/tests/high_level.cpp @@ -317,366 +317,67 @@ void test_reconnect2() ioc.run(); } -std::vector gresp; +struct receiver7 { +public: + int counter = 0; -net::awaitable -test_general(net::ip::tcp::resolver::results_type const& res) + receiver7(client_type& db) + : db_{&db} + , adapter_{adapt(counter)} + {} + + void on_resp3(command cmd, node const& nd, boost::system::error_code& ec) + { + if (cmd == command::incr) + adapter_(nd, ec); + } + + void on_write(std::size_t) + { + if (!std::exchange(sent_, true)) { + db_->send(command::del, "key"); + db_->send(command::multi); + db_->send(command::ping, "aaa"); + db_->send(command::incr, "key"); + db_->send(command::ping, "bbb"); + db_->send(command::discard); + db_->send(command::ping, "ccc"); + db_->send(command::incr, "key"); + db_->send(command::quit); + } + } + + void on_read(command cmd, std::size_t) + { + } + +private: + bool sent_ = false; + client_type* db_; + adapter_t adapter_; +}; + +void test_discard() { - auto ex = co_await net::this_coro::executor; + auto f = [](auto ec) + { + expect_error(ec, net::error::misc_errors::eof); + }; - std::vector list_ {1 ,2, 3, 4, 5, 6}; - std::string set_ {"aaa"}; + net::io_context ioc; + client_type db(ioc.get_executor()); - //---------------------------------- - std::string request; - auto sr = make_serializer(request); - sr.push(command::hello, 3); - sr.push(command::flushall); - sr.push_range(command::rpush, "a", list_); - sr.push(command::llen, "a"); - sr.push(command::lrange, "a", 0, -1); - sr.push(command::ltrim, "a", 2, -2); - sr.push(command::lpop, "a"); - //sr.lpop("a", 2); // Not working? - sr.push(command::set, "b", set_); - sr.push(command::get, "b"); - sr.push(command::append, "b", "b"); - sr.push(command::del, "b"); - sr.push(command::subscribe, "channel"); - sr.push(command::incr, "3"); + receiver7 recv{db}; + db.set_read_handler([&recv](auto cmd, std::size_t n){recv.on_read(cmd, n);}); + db.set_write_handler([&recv](std::size_t n){recv.on_write(n);}); + db.set_resp3_handler([&recv](auto a, auto b, auto c){recv.on_resp3(a, b, c);}); - // transaction - for (auto i = 0; i < 3; ++i) { - sr.push(command::multi); - sr.push(command::ping); - sr.push(command::lrange, "a", 0, -1); - sr.push(command::ping); - // TODO: It looks like we can't publish to a channel we - // are already subscribed to from inside a transaction. - //req.publish("some-channel", "message1"); - sr.push(command::exec); - } + db.async_run("127.0.0.1", "6379", f); + ioc.run(); - std::map m1 = - { {"field1", "value1"} - , {"field2", "value2"}}; - - sr.push_range(command::hset, "d", m1); - sr.push(command::hget, "d", "field2"); - sr.push(command::hgetall, "d"); - sr.push(command::hdel, "d", "field1", "field2"); // TODO: Test as range too. - sr.push(command::hincrby, "e", "some-field", 10); - - sr.push(command::zadd, "f", 1, "Marcelo"); - sr.push(command::zrange, "f", 0, 1); - sr.push(command::zrangebyscore, "f", 1, 1); - sr.push(command::zremrangebyscore, "f", "-inf", "+inf"); - - auto const v = std::vector{1, 2, 3}; - sr.push_range(command::sadd, "g", v); - sr.push(command::smembers, "g"); - sr.push(command::quit); - //---------------------------------- - - net::ip::tcp::socket socket{ex}; - co_await net::async_connect(socket, res, net::use_awaitable); - co_await net::async_write(socket, net::buffer(request), net::use_awaitable); - - // Reads the responses. - std::string buffer; - - // hello - co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(), net::use_awaitable); - - // flushall - co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(), net::use_awaitable); - - { // rpush: - std::vector resp; - co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable); - auto const n = std::to_string(std::size(list_)); - std::vector expected - { {resp3::type::number, 1UL, 0UL, n} }; - - expect_eq(resp, expected, "rpush (value)"); - } - - { // llen - std::vector resp; - co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable); - std::vector expected - { {resp3::type::number, 1UL, 0UL, {"6"}} }; - expect_eq(resp, expected, "llen"); - } - - { // lrange - std::vector resp; - co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable); - - std::vector expected - { {resp3::type::array, 6UL, 0UL, {}} - , {resp3::type::blob_string, 1UL, 1UL, {"1"}} - , {resp3::type::blob_string, 1UL, 1UL, {"2"}} - , {resp3::type::blob_string, 1UL, 1UL, {"3"}} - , {resp3::type::blob_string, 1UL, 1UL, {"4"}} - , {resp3::type::blob_string, 1UL, 1UL, {"5"}} - , {resp3::type::blob_string, 1UL, 1UL, {"6"}} - }; - - expect_eq(resp, expected, "lrange "); - } - - { // ltrim - std::vector resp; - co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable); - - std::vector expected - { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; - - expect_eq(resp, expected, "ltrim"); - } - - { // lpop - std::vector resp; - co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable); - - std::vector expected - { {resp3::type::blob_string, 1UL, 0UL, {"3"}} }; - - expect_eq(resp, expected, "lpop"); - } - - //{ // lpop - // std::vector resp; - // co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable); - - // std::vector expected - // { {resp3::type::array, 2UL, 0UL, {}} - // , {resp3::type::array, 1UL, 1UL, {"4"}} - // , {resp3::type::array, 1UL, 1UL, {"5"}} - // }; - - // expect_eq(resp, expected, "lpop"); - //} - - //{ // lrange - // static int c = 0; - - // if (c == 0) { - // std::vector expected - // { {resp3::type::array, 6UL, 0UL, {}} - // , {resp3::type::blob_string, 1UL, 1UL, {"1"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"2"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"3"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"4"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"5"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"6"}} - // }; - - // expect_eq(resp, expected, "lrange "); - // } else { - // std::vector expected - // { {resp3::type::simple_string, 1UL, 0UL, {"QUEUED"}} }; - - // expect_eq(resp, expected, "lrange (inside transaction)"); - // } - // - // ++c; - //} - - //for (;;) { - // switch (cmd) { - // case command::multi: - // { - // std::vector expected - // { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; - - // expect_eq(resp, expected, "multi"); - // } break; - // case command::ping: - // { - // std::vector expected - // { {resp3::type::simple_string, 1UL, 0UL, {"QUEUED"}} }; - - // expect_eq(resp, expected, "ping"); - // } break; - // case command::set: - // { - // std::vector expected - // { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; - - // expect_eq(resp, expected, "set"); - // } break; - // case command::quit: - // { - // std::vector expected - // { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; - - // expect_eq(resp, expected, "quit"); - // } break; - // case command::flushall: - // { - // std::vector expected - // { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} }; - - // expect_eq(resp, expected, "flushall"); - // } break; - // case command::append: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"4"}} }; - - // expect_eq(resp, expected, "append"); - // } break; - // case command::hset: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"2"}} }; - - // expect_eq(resp, expected, "hset"); - // } break; - // case command::del: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"1"}} }; - - // expect_eq(resp, expected, "del"); - // } break; - // case command::incr: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"1"}} }; - - // expect_eq(resp, expected, "incr"); - // } break; - // case command::publish: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"1"}} }; - - // expect_eq(resp, expected, "publish"); - // } break; - // case command::hincrby: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"10"}} }; - - // expect_eq(resp, expected, "hincrby"); - // } break; - // case command::zadd: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"1"}} }; - - // expect_eq(resp, expected, "zadd"); - // } break; - // case command::sadd: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"3"}} }; - - // expect_eq(resp, expected, "sadd"); - // } break; - // case command::hdel: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"2"}} }; - - // expect_eq(resp, expected, "hdel"); - // } break; - // case command::zremrangebyscore: - // { - // std::vector expected - // { {resp3::type::number, 1UL, 0UL, {"1"}} }; - - // expect_eq(resp, expected, "zremrangebyscore"); - // } break; - // case command::get: - // { - // std::vector expected - // { {resp3::type::blob_string, 1UL, 0UL, test.set_} }; - - // expect_eq(resp, expected, "get"); - // } break; - // case command::hget: - // { - // std::vector expected - // { {resp3::type::blob_string, 1UL, 0UL, std::string{"value2"}} }; - - // expect_eq(resp, expected, "hget"); - // } break; - // case command::hvals: - // { - // std::vector expected - // { {resp3::type::array, 2UL, 0UL, {}} - // , {resp3::type::array, 1UL, 1UL, {"value1"}} - // , {resp3::type::array, 1UL, 1UL, {"value2"}} - // }; - - // expect_eq(resp, expected, "hvals"); - // } break; - // case command::zrange: - // { - // std::vector expected - // { {resp3::type::array, 1UL, 0UL, {}} - // , {resp3::type::blob_string, 1UL, 1UL, {"Marcelo"}} - // }; - - // expect_eq(resp, expected, "hvals"); - // } break; - // case command::zrangebyscore: - // { - // std::vector expected - // { {resp3::type::array, 1UL, 0UL, {}} - // , {resp3::type::blob_string, 1UL, 1UL, {"Marcelo"}} - // }; - - // expect_eq(resp, expected, "zrangebyscore"); - // } break; - // case command::exec: - // { - // std::vector expected - // { {resp3::type::array, 3UL, 0UL, {}} - // , {resp3::type::simple_string, 1UL, 1UL, {"PONG"}} - // , {resp3::type::array, 2UL, 1UL, {}} - // , {resp3::type::blob_string, 1UL, 2UL, {"4"}} - // , {resp3::type::blob_string, 1UL, 2UL, {"5"}} - // , {resp3::type::simple_string, 1UL, 1UL, {"PONG"}} - // }; - - // expect_eq(resp, expected, "transaction"); - - // } break; - // case command::hgetall: - // { - // std::vector expected - // { {resp3::type::map, 2UL, 0UL, {}} - // , {resp3::type::blob_string, 1UL, 1UL, {"field1"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"value1"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"field2"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"value2"}} - // }; - // expect_eq(resp, expected, "hgetall (value)"); - // } break; - // case command::smembers: - // { - // std::vector expected - // { {resp3::type::set, 3UL, 0UL, {}} - // , {resp3::type::blob_string, 1UL, 1UL, {"1"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"2"}} - // , {resp3::type::blob_string, 1UL, 1UL, {"3"}} - // }; - // expect_eq(resp, expected, "smembers (value)"); - // } break; - // default: { std::cout << "Error: " << resp.front().data_type << " " << cmd << std::endl; } - // } - - // resp.clear(); - //} + expect_eq(recv.counter, 1, "test_discard."); } -//------------------------------------------------------------------- - int main() { test_resolve_error(); @@ -687,12 +388,6 @@ int main() test_push2(); test_reconnect(); test_reconnect2(); - - //net::io_context ioc {1}; - //tcp::resolver resv(ioc); - //auto const res = resv.resolve("127.0.0.1", "6379"); - - //co_spawn(ioc, test_general(res), net::detached); - //ioc.run(); + test_discard(); }