2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Adds test to discard.

This commit is contained in:
Marcelo Zimbres
2022-04-23 16:24:25 +02:00
parent fd0cae92ee
commit a940f7f4bf
5 changed files with 70 additions and 378 deletions

View File

@@ -17,10 +17,10 @@ check_PROGRAMS += serialization_sync
check_PROGRAMS += intro_high_level
check_PROGRAMS += aggregates_high_level
check_PROGRAMS += test_low_level
check_PROGRAMS += test_high_level
if HAVE_CXX20
check_PROGRAMS += transaction
check_PROGRAMS += custom_adapter
check_PROGRAMS += test_high_level
endif
EXTRA_PROGRAMS =
@@ -45,8 +45,8 @@ serialization_sync_SOURCES = $(top_srcdir)/examples/serialization_sync.cpp
commands_SOURCES = $(top_srcdir)/tools/commands.cpp
subscriber_high_level_SOURCES = $(top_srcdir)/examples/subscriber_high_level.cpp
test_low_level_SOURCES = $(top_srcdir)/tests/low_level.cpp
if HAVE_CXX20
test_high_level_SOURCES = $(top_srcdir)/tests/high_level.cpp
if HAVE_CXX20
transaction_SOURCES = $(top_srcdir)/examples/transaction.cpp
subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp
custom_adapter_SOURCES = $(top_srcdir)/examples/custom_adapter.cpp

View File

@@ -12,6 +12,7 @@
#include <functional>
#include <iterator>
#include <algorithm>
#include <utility>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
@@ -21,10 +22,6 @@
#include <aedis/generic/detail/client_ops.hpp>
#include <aedis/redis/command.hpp>
// TODO: What to do if users send a discard command not contained in a
// transaction. The client object will try to pop the queue until a
// multi is found.
namespace aedis {
namespace generic {
@@ -115,10 +112,11 @@ public:
sr.push(cmd, args...);
auto const after = requests_.size();
assert(after - before != 0);
info_.back().size += after - before;;
auto const d = after - before;
info_.back().size += d;;
if (!has_push_response(cmd)) {
commands_.push_back(cmd);
commands_.push_back(std::make_pair(cmd, d));
++info_.back().cmds;
}
@@ -151,10 +149,11 @@ public:
sr.push_range2(cmd, key, begin, end);
auto const after = requests_.size();
assert(after - before != 0);
info_.back().size += after - before;;
auto const d = after - before;
info_.back().size += d;
if (!has_push_response(cmd)) {
commands_.push_back(cmd);
commands_.push_back(std::make_pair(cmd, d));
++info_.back().cmds;
}
@@ -186,10 +185,11 @@ public:
sr.push_range2(cmd, begin, end);
auto const after = requests_.size();
assert(after - before != 0);
info_.back().size += after - before;;
auto const d = after - before;
info_.back().size += d;
if (!has_push_response(cmd)) {
commands_.push_back(cmd);
commands_.push_back(std::make_pair(cmd, d));
++info_.back().cmds;
}
@@ -359,7 +359,7 @@ private:
++info_.front().cmds;
// Push front.
commands_.push_back(Command::hello);
commands_.push_back(std::make_pair(Command::hello, hello_size));
std::rotate(
std::begin(commands_),
std::prev(std::end(commands_)),
@@ -396,9 +396,6 @@ private:
// Returns true when the next request can be writen.
bool on_cmd(Command cmd)
{
// TODO: If the response to a discard is received we have to
// remove all commands up until multi.
assert(!info_.empty());
assert(!commands_.empty());
@@ -506,7 +503,7 @@ private:
std::string requests_;
// The commands contained in the requests.
std::vector<Command> commands_;
std::vector<std::pair<Command, std::size_t>> commands_;
// Info about the requests.
std::vector<info> info_;

View File

@@ -371,7 +371,7 @@ struct reader_op {
cli->cmd = Command::invalid;
if (cli->data_type != resp3::type::push) {
assert(!cli->commands_.empty());
cli->cmd = cli->commands_.front();
cli->cmd = cli->commands_.front().first;
}
yield cli->async_read(std::move(self));

View File

@@ -72,7 +72,7 @@ enum class command {
decrby,
/// https://redis.io/commands/del
del,
/// https://redis.io/commands/discard (not supported yet)
/// https://redis.io/commands/discard
discard,
/// https://redis.io/commands/dump
dump,

View File

@@ -317,366 +317,67 @@ void test_reconnect2()
ioc.run();
}
std::vector<node_type> gresp;
struct receiver7 {
public:
int counter = 0;
net::awaitable<void>
test_general(net::ip::tcp::resolver::results_type const& res)
receiver7(client_type& db)
: db_{&db}
, adapter_{adapt(counter)}
{}
void on_resp3(command cmd, node<boost::string_view> const& nd, boost::system::error_code& ec)
{
if (cmd == command::incr)
adapter_(nd, ec);
}
void on_write(std::size_t)
{
if (!std::exchange(sent_, true)) {
db_->send(command::del, "key");
db_->send(command::multi);
db_->send(command::ping, "aaa");
db_->send(command::incr, "key");
db_->send(command::ping, "bbb");
db_->send(command::discard);
db_->send(command::ping, "ccc");
db_->send(command::incr, "key");
db_->send(command::quit);
}
}
void on_read(command cmd, std::size_t)
{
}
private:
bool sent_ = false;
client_type* db_;
adapter_t<int> adapter_;
};
void test_discard()
{
auto ex = co_await net::this_coro::executor;
auto f = [](auto ec)
{
expect_error(ec, net::error::misc_errors::eof);
};
std::vector<int> list_ {1 ,2, 3, 4, 5, 6};
std::string set_ {"aaa"};
net::io_context ioc;
client_type db(ioc.get_executor());
//----------------------------------
std::string request;
auto sr = make_serializer(request);
sr.push(command::hello, 3);
sr.push(command::flushall);
sr.push_range(command::rpush, "a", list_);
sr.push(command::llen, "a");
sr.push(command::lrange, "a", 0, -1);
sr.push(command::ltrim, "a", 2, -2);
sr.push(command::lpop, "a");
//sr.lpop("a", 2); // Not working?
sr.push(command::set, "b", set_);
sr.push(command::get, "b");
sr.push(command::append, "b", "b");
sr.push(command::del, "b");
sr.push(command::subscribe, "channel");
sr.push(command::incr, "3");
receiver7 recv{db};
db.set_read_handler([&recv](auto cmd, std::size_t n){recv.on_read(cmd, n);});
db.set_write_handler([&recv](std::size_t n){recv.on_write(n);});
db.set_resp3_handler([&recv](auto a, auto b, auto c){recv.on_resp3(a, b, c);});
// transaction
for (auto i = 0; i < 3; ++i) {
sr.push(command::multi);
sr.push(command::ping);
sr.push(command::lrange, "a", 0, -1);
sr.push(command::ping);
// TODO: It looks like we can't publish to a channel we
// are already subscribed to from inside a transaction.
//req.publish("some-channel", "message1");
sr.push(command::exec);
}
db.async_run("127.0.0.1", "6379", f);
ioc.run();
std::map<std::string, std::string> m1 =
{ {"field1", "value1"}
, {"field2", "value2"}};
sr.push_range(command::hset, "d", m1);
sr.push(command::hget, "d", "field2");
sr.push(command::hgetall, "d");
sr.push(command::hdel, "d", "field1", "field2"); // TODO: Test as range too.
sr.push(command::hincrby, "e", "some-field", 10);
sr.push(command::zadd, "f", 1, "Marcelo");
sr.push(command::zrange, "f", 0, 1);
sr.push(command::zrangebyscore, "f", 1, 1);
sr.push(command::zremrangebyscore, "f", "-inf", "+inf");
auto const v = std::vector<int>{1, 2, 3};
sr.push_range(command::sadd, "g", v);
sr.push(command::smembers, "g");
sr.push(command::quit);
//----------------------------------
net::ip::tcp::socket socket{ex};
co_await net::async_connect(socket, res, net::use_awaitable);
co_await net::async_write(socket, net::buffer(request), net::use_awaitable);
// Reads the responses.
std::string buffer;
// hello
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(), net::use_awaitable);
// flushall
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(), net::use_awaitable);
{ // rpush:
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
auto const n = std::to_string(std::size(list_));
std::vector<node_type> expected
{ {resp3::type::number, 1UL, 0UL, n} };
expect_eq(resp, expected, "rpush (value)");
}
{ // llen
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node_type> expected
{ {resp3::type::number, 1UL, 0UL, {"6"}} };
expect_eq(resp, expected, "llen");
}
{ // lrange
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node_type> expected
{ {resp3::type::array, 6UL, 0UL, {}}
, {resp3::type::blob_string, 1UL, 1UL, {"1"}}
, {resp3::type::blob_string, 1UL, 1UL, {"2"}}
, {resp3::type::blob_string, 1UL, 1UL, {"3"}}
, {resp3::type::blob_string, 1UL, 1UL, {"4"}}
, {resp3::type::blob_string, 1UL, 1UL, {"5"}}
, {resp3::type::blob_string, 1UL, 1UL, {"6"}}
};
expect_eq(resp, expected, "lrange ");
}
{ // ltrim
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node_type> expected
{ {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
expect_eq(resp, expected, "ltrim");
}
{ // lpop
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, {"3"}} };
expect_eq(resp, expected, "lpop");
}
//{ // lpop
// std::vector<node_type> resp;
// co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
// std::vector<node_type> expected
// { {resp3::type::array, 2UL, 0UL, {}}
// , {resp3::type::array, 1UL, 1UL, {"4"}}
// , {resp3::type::array, 1UL, 1UL, {"5"}}
// };
// expect_eq(resp, expected, "lpop");
//}
//{ // lrange
// static int c = 0;
// if (c == 0) {
// std::vector<node_type> expected
// { {resp3::type::array, 6UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"1"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"2"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"3"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"4"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"5"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"6"}}
// };
// expect_eq(resp, expected, "lrange ");
// } else {
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"QUEUED"}} };
// expect_eq(resp, expected, "lrange (inside transaction)");
// }
//
// ++c;
//}
//for (;;) {
// switch (cmd) {
// case command::multi:
// {
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// expect_eq(resp, expected, "multi");
// } break;
// case command::ping:
// {
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"QUEUED"}} };
// expect_eq(resp, expected, "ping");
// } break;
// case command::set:
// {
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// expect_eq(resp, expected, "set");
// } break;
// case command::quit:
// {
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// expect_eq(resp, expected, "quit");
// } break;
// case command::flushall:
// {
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// expect_eq(resp, expected, "flushall");
// } break;
// case command::append:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"4"}} };
// expect_eq(resp, expected, "append");
// } break;
// case command::hset:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"2"}} };
// expect_eq(resp, expected, "hset");
// } break;
// case command::del:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// expect_eq(resp, expected, "del");
// } break;
// case command::incr:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// expect_eq(resp, expected, "incr");
// } break;
// case command::publish:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// expect_eq(resp, expected, "publish");
// } break;
// case command::hincrby:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"10"}} };
// expect_eq(resp, expected, "hincrby");
// } break;
// case command::zadd:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// expect_eq(resp, expected, "zadd");
// } break;
// case command::sadd:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"3"}} };
// expect_eq(resp, expected, "sadd");
// } break;
// case command::hdel:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"2"}} };
// expect_eq(resp, expected, "hdel");
// } break;
// case command::zremrangebyscore:
// {
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// expect_eq(resp, expected, "zremrangebyscore");
// } break;
// case command::get:
// {
// std::vector<node_type> expected
// { {resp3::type::blob_string, 1UL, 0UL, test.set_} };
// expect_eq(resp, expected, "get");
// } break;
// case command::hget:
// {
// std::vector<node_type> expected
// { {resp3::type::blob_string, 1UL, 0UL, std::string{"value2"}} };
// expect_eq(resp, expected, "hget");
// } break;
// case command::hvals:
// {
// std::vector<node_type> expected
// { {resp3::type::array, 2UL, 0UL, {}}
// , {resp3::type::array, 1UL, 1UL, {"value1"}}
// , {resp3::type::array, 1UL, 1UL, {"value2"}}
// };
// expect_eq(resp, expected, "hvals");
// } break;
// case command::zrange:
// {
// std::vector<node_type> expected
// { {resp3::type::array, 1UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"Marcelo"}}
// };
// expect_eq(resp, expected, "hvals");
// } break;
// case command::zrangebyscore:
// {
// std::vector<node_type> expected
// { {resp3::type::array, 1UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"Marcelo"}}
// };
// expect_eq(resp, expected, "zrangebyscore");
// } break;
// case command::exec:
// {
// std::vector<node_type> expected
// { {resp3::type::array, 3UL, 0UL, {}}
// , {resp3::type::simple_string, 1UL, 1UL, {"PONG"}}
// , {resp3::type::array, 2UL, 1UL, {}}
// , {resp3::type::blob_string, 1UL, 2UL, {"4"}}
// , {resp3::type::blob_string, 1UL, 2UL, {"5"}}
// , {resp3::type::simple_string, 1UL, 1UL, {"PONG"}}
// };
// expect_eq(resp, expected, "transaction");
// } break;
// case command::hgetall:
// {
// std::vector<node_type> expected
// { {resp3::type::map, 2UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"field1"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"value1"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"field2"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"value2"}}
// };
// expect_eq(resp, expected, "hgetall (value)");
// } break;
// case command::smembers:
// {
// std::vector<node_type> expected
// { {resp3::type::set, 3UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"1"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"2"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"3"}}
// };
// expect_eq(resp, expected, "smembers (value)");
// } break;
// default: { std::cout << "Error: " << resp.front().data_type << " " << cmd << std::endl; }
// }
// resp.clear();
//}
expect_eq(recv.counter, 1, "test_discard.");
}
//-------------------------------------------------------------------
int main()
{
test_resolve_error();
@@ -687,12 +388,6 @@ int main()
test_push2();
test_reconnect();
test_reconnect2();
//net::io_context ioc {1};
//tcp::resolver resv(ioc);
//auto const res = resv.resolve("127.0.0.1", "6379");
//co_spawn(ioc, test_general(res), net::detached);
//ioc.run();
test_discard();
}