2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Some improvements in the code.

This commit is contained in:
Marcelo Zimbres
2022-02-22 21:44:37 +01:00
parent feb383d7e2
commit c27b3b6b85
21 changed files with 451 additions and 750 deletions

View File

@@ -12,11 +12,8 @@ AM_LDFLAGS += -pthread
check_PROGRAMS =
check_PROGRAMS += intro
check_PROGRAMS += sets
check_PROGRAMS += hashes
check_PROGRAMS += stl_containers
check_PROGRAMS += serialization
check_PROGRAMS += multipurpose_response
check_PROGRAMS += lists
check_PROGRAMS += key_expiration
check_PROGRAMS += response_adapter
check_PROGRAMS += sync
@@ -37,11 +34,8 @@ CLEANFILES += $(EXTRA_PROGRAMS)
all: $(check_PROGRAMS) $(EXTRA_PROGRAMS)
intro_SOURCES = $(top_srcdir)/examples/intro.cpp
sets_SOURCES = $(top_srcdir)/examples/sets.cpp
hashes_SOURCES = $(top_srcdir)/examples/hashes.cpp
stl_containers_SOURCES = $(top_srcdir)/examples/stl_containers.cpp
serialization_SOURCES = $(top_srcdir)/examples/serialization.cpp
multipurpose_response_SOURCES = $(top_srcdir)/examples/multipurpose_response.cpp
lists_SOURCES = $(top_srcdir)/examples/lists.cpp
key_expiration_SOURCES = $(top_srcdir)/examples/key_expiration.cpp
response_adapter_SOURCES = $(top_srcdir)/examples/response_adapter.cpp
sync_SOURCES = $(top_srcdir)/examples/sync.cpp

View File

@@ -42,9 +42,6 @@
- transaction.cpp: Shows how to read the responses to a trasaction
efficiently. See also https://redis.io/topics/transactions.
- multipurpose_response.cpp: Shows how to read any responses to
Redis commands, including nested aggegagtes.
- subscriber.cpp: Shows how channel subscription works at a low
level. See also https://redis.io/topics/pubsub.

View File

@@ -42,31 +42,20 @@ auto adapt(T& t)
template <class Client>
struct run_op {
Client* cli;
// TODO: Move this to the client object.
std::string host;
std::string service;
net::coroutine coro;
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro) {
yield cli->async_resolve(host, service, std::move(self));
if (ec) {
self.complete(ec);
return;
}
yield cli->async_connect(std::move(self));
yield cli->socket_.async_connect(cli->endpoint_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
cli->send(command::hello, 3);
yield cli->async_read_write(std::move(self));
self.complete(ec);
}
}
@@ -103,60 +92,6 @@ struct read_write_op {
}
};
template <class Client>
struct connect_op {
using iterator_type =
typename Client::resolver_type::results_type::iterator;
Client* cli;
net::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, iterator_type iter = {})
{
reenter (coro) {
yield
cli->socket_.async_connect(
*std::cbegin(cli->results_),
std::move(self));
if (!ec)
cli->send(command::hello, 3);
self.complete(ec);
}
}
};
template <class Client>
struct resolve_op {
using results_type = typename Client::resolver_type::results_type;
Client* cli;
std::string host;
std::string service;
net::coroutine coro;
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, results_type results = {})
{
reenter (coro) {
yield cli->resolver_.async_resolve(host, service, std::move(self));
if (!ec)
cli->results_ = results;
self.complete(ec);
}
}
};
// Consider limiting the size of the pipelines by spliting that last
// one in two if needed.
template <class Client>
@@ -185,6 +120,7 @@ struct writer_op {
std::move(self));
if (ec) {
// TODO: Close the socket so that the reader can return.
self.complete(ec);
return;
}
@@ -198,7 +134,6 @@ struct writer_op {
cli->req_info_.erase(std::begin(cli->req_info_));
cli->wcallback_(size);
//self.complete({}, size);
yield cli->timer_.async_wait(std::move(self));
@@ -290,27 +225,18 @@ public:
//using stream_type = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using stream_type = AsyncReadWriteStream;
using executor_type = stream_type::executor_type;
using resolver_type = net::ip::tcp::resolver;
using default_completion_token_type = net::default_completion_token_t<executor_type>;
using writer_callback_type = std::function<void(std::size_t)>;
using reader_callback_type = std::function<void(command)>;
using response_adapter_type = std::function<void(command, resp3::type, std::size_t, std::size_t, char const*, std::size_t, std::error_code&)>;
private:
using results_type = typename resolver_type::results_type;
template <class T>
friend struct read_op;
template <class T>
friend struct writer_op;
template <class T>
friend struct resolve_op;
template <class T>
friend struct connect_op;
template <class T>
friend struct read_write_op;
@@ -344,8 +270,9 @@ private:
// Timer used to inform the write coroutine that it can write the
// next message in the output queue.
net::steady_timer timer_;
resolver_type resolver_;
results_type results_;
// Redis endpoint.
net::ip::tcp::endpoint endpoint_;
bool stop_writer_ = false;
@@ -416,30 +343,6 @@ private:
>(writer_op<client>{this}, token, socket_, timer_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_resolve(
std::string const& host = "localhost",
std::string const& service = "6379",
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(resolve_op<client>{this, host, service}, token, resolver_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_connect(
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(connect_op<client>{this}, token, socket_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_read_write(
@@ -448,7 +351,7 @@ private:
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(read_write_op<client>{this}, token, socket_, timer_, resolver_);
>(read_write_op<client>{this}, token, socket_, timer_);
}
public:
/** \brief Client constructor.
@@ -460,7 +363,6 @@ public:
client(net::any_io_executor ex)
: socket_{ex}
, timer_{ex}
, resolver_{ex}
{
timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
@@ -468,7 +370,7 @@ public:
/// Returns the executor used for I/O with Redis.
auto get_executor() {return socket_.get_executor();}
void set_writter_callback(writer_callback_type wcallback)
void set_writer_callback(writer_callback_type wcallback)
{ wcallback_ = std::move(wcallback); }
void set_reader_callback(reader_callback_type rcallback)
@@ -529,14 +431,14 @@ public:
template <class CompletionToken = default_completion_token_type>
auto
async_run(
std::string const& host = "localhost",
std::string const& service = "6379",
net::ip::tcp::endpoint ep,
CompletionToken token = CompletionToken{})
{
endpoint_ = ep;
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(run_op<client>{this, host, service}, token, socket_, timer_, resolver_);
>(run_op<client>{this}, token, socket_, timer_);
}
};

View File

@@ -30,11 +30,11 @@ template <class T>
typename std::enable_if<std::is_integral<T>::value, void>::type
from_string(
T& i,
char const* data,
char const* value,
std::size_t data_size,
std::error_code& ec)
{
auto const res = std::from_chars(data, data + data_size, i);
auto const res = std::from_chars(value, value + data_size, i);
if (res.ec != std::errc())
ec = std::make_error_code(res.ec);
}
@@ -43,11 +43,11 @@ template <class CharT, class Traits, class Allocator>
void
from_string(
std::basic_string<CharT, Traits, Allocator>& s,
char const* data,
char const* value,
std::size_t data_size,
std::error_code&)
{
s.assign(data, data_size);
s.assign(value, data_size);
}
void set_on_resp3_error(type t, std::error_code& ec)
@@ -91,27 +91,27 @@ public:
* Users who what to customize their response types are required to derive
* from this class and override this function, see examples.
*
* \param t The RESP3 type of the data.
* \param t The RESP3 type.
*
* \param n When t is an aggregate data type this will contain its size
* (see also element_multiplicity) for simple data types this is always 1.
* \param n When t is an aggregate type this will contain its size
* (see also element_multiplicity) for simple types this is always 1.
*
* \param depth The element depth in the tree.
*
* \param data A pointer to the data.
* \param value A pointer to the data.
*
* \param size The size of data.
* \param size The size of value.
*/
void
operator()(
type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t size,
std::error_code&)
{
result_->emplace_back(t, aggregate_size, depth, std::string{data, size});
result_->emplace_back(t, aggregate_size, depth, std::string{value, size});
}
};
@@ -128,14 +128,14 @@ public:
type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t data_size,
std::error_code&)
{
result_->data_type = t;
result_->aggregate_size = aggregate_size;
result_->depth = depth;
result_->data.assign(data, data_size);
result_->value.assign(value, data_size);
}
};
@@ -153,7 +153,7 @@ public:
type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t data_size,
std::error_code& ec)
{
@@ -165,7 +165,7 @@ public:
}
assert(aggregate_size == 1);
from_string(*result_, data, data_size, ec);
from_string(*result_, value, data_size, ec);
}
};
@@ -182,7 +182,7 @@ public:
type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t data_size,
std::error_code& ec)
{
@@ -206,7 +206,7 @@ public:
if (!result_->has_value())
*result_ = T{};
from_string(result_->value(), data, data_size, ec);
from_string(result_->value(), value, data_size, ec);
}
};
@@ -225,7 +225,7 @@ public:
operator()(type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t data_size,
std::error_code& ec)
{
@@ -243,7 +243,7 @@ public:
} else {
assert(aggregate_size == 1);
from_string(result_->at(i_), data, data_size, ec);
from_string(result_->at(i_), value, data_size, ec);
++i_;
}
}
@@ -261,7 +261,7 @@ public:
operator()(type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t data_size,
std::error_code& ec)
{
@@ -283,7 +283,7 @@ public:
}
result_->push_back({});
from_string(result_->back(), data, data_size, ec);
from_string(result_->back(), value, data_size, ec);
}
};
@@ -303,7 +303,7 @@ public:
operator()(type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t data_size,
std::error_code& ec)
{
@@ -316,11 +316,12 @@ public:
assert(!is_aggregate(t));
// TODO: This should cause an error not an assertion.
assert(depth == 1);
assert(aggregate_size == 1);
typename Container::key_type obj;
from_string(obj, data, data_size, ec);
from_string(obj, value, data_size, ec);
if (hint_ == std::end(*result_)) {
auto const ret = result_->insert(std::move(obj));
hint_ = ret.first;
@@ -347,7 +348,7 @@ public:
operator()(type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
char const* value,
std::size_t data_size,
std::error_code& ec)
{
@@ -364,11 +365,11 @@ public:
if (on_key_) {
typename Container::key_type obj;
from_string(obj, data, data_size, ec);
from_string(obj, value, data_size, ec);
current_ = result_->insert(current_, {std::move(obj), {}});
} else {
typename Container::mapped_type obj;
from_string(obj, data, data_size, ec);
from_string(obj, value, data_size, ec);
current_->second = std::move(obj);
}

View File

@@ -1,67 +0,0 @@
/* Copyright (c) 2019 - 2022 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <aedis/resp3/node.hpp>
namespace aedis {
namespace resp3 {
std::string to_string(node const& in)
{
std::string out;
out += std::to_string(in.depth);
out += '\t';
out += to_string(in.data_type);
out += '\t';
out += std::to_string(in.aggregate_size);
out += '\t';
if (!is_aggregate(in.data_type))
out += in.data;
return out;
}
bool operator==(node const& a, node const& b)
{
return a.aggregate_size == b.aggregate_size
&& a.depth == b.depth
&& a.data_type == b.data_type
&& a.data == b.data;
};
std::ostream& operator<<(std::ostream& os, node const& o)
{
os << to_string(o);
return os;
}
std::ostream& operator<<(std::ostream& os, std::vector<node> const& r)
{
os << to_string(r);
return os;
}
// TODO: Output like in redis-cli.
std::string to_string(std::vector<node> const& vec)
{
if (std::empty(vec))
return {};
auto begin = std::cbegin(vec);
std::string res;
for (; begin != std::prev(std::cend(vec)); ++begin) {
res += to_string(*begin);
res += '\n';
}
res += to_string(*begin);
return res;
}
} // resp3
} // aedis

View File

@@ -21,6 +21,7 @@ namespace resp3 {
* Redis responses are the pre-order view of the response tree (see
* https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR).
*/
template <class String>
struct node {
/// The RESP3 type of the data in this node.
type data_type;
@@ -31,8 +32,8 @@ struct node {
/// The depth of this node in the response tree.
std::size_t depth;
/// The actual data. For aggregate data types this is always empty.
std::string data; // TODO: rename to value.
/// The actual data. For aggregate types this is always empty.
String value;
};
/** \brief Converts the node to a string.
@@ -40,29 +41,79 @@ struct node {
*
* \param obj The node object.
*/
std::string to_string(node const& obj);
template <class String>
std::string to_string(node<String> const& in)
{
std::string out;
out += std::to_string(in.depth);
out += '\t';
out += to_string(in.data_type);
out += '\t';
out += std::to_string(in.aggregate_size);
out += '\t';
if (!is_aggregate(in.data_type))
out += in.value;
return out;
}
/** \brief Compares a node for equality.
* \ingroup any
*/
bool operator==(node const& a, node const& b);
template <class String>
bool operator==(node<String> const& a, node<String> const& b)
{
return a.aggregate_size == b.aggregate_size
&& a.depth == b.depth
&& a.data_type == b.data_type
&& a.value == b.value;
};
/** \brief Writes the node to the stream.
* \ingroup any
*
* NOTE: Binary data is not converted to text.
*/
std::ostream& operator<<(std::ostream& os, node const& o);
template <class String>
std::ostream& operator<<(std::ostream& os, node<String> const& o)
{
os << to_string(o);
return os;
}
/** \brief Writes the response to the output stream
* \ingroup any
*
* TODO: Output like in redis-cli.
*/
template <class String>
std::string to_string(std::vector<node<String>> const& vec)
{
if (std::empty(vec))
return {};
auto begin = std::cbegin(vec);
std::string res;
for (; begin != std::prev(std::cend(vec)); ++begin) {
res += to_string(*begin);
res += '\n';
}
res += to_string(*begin);
return res;
}
/** \brief Writes the response to the output stream
* \ingroup any
*/
std::string to_string(std::vector<node> const& vec);
/** \brief Writes the response to the output stream
* \ingroup any
*/
std::ostream& operator<<(std::ostream& os, std::vector<node> const& r);
template <class String>
std::ostream& operator<<(std::ostream& os, std::vector<node<String>> const& r)
{
os << to_string(r);
return os;
}
} // resp3
} // aedis

View File

@@ -62,18 +62,18 @@ struct response_traits<std::vector<T, Allocator>>
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <>
struct response_traits<node>
template <class T>
struct response_traits<node<T>>
{
using response_type = node;
using response_type = node<T>;
using adapter_type = adapter::detail::adapter_node<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <class Allocator>
struct response_traits<std::vector<node, Allocator>>
template <class String, class Allocator>
struct response_traits<std::vector<node<String>, Allocator>>
{
using response_type = std::vector<node, Allocator>;
using response_type = std::vector<node<String>, Allocator>;
using adapter_type = adapter::detail::general<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

View File

@@ -10,5 +10,4 @@
#include <aedis/redis/impl/command.ipp>
#include <aedis/sentinel/impl/command.ipp>
#include <aedis/resp3/impl/type.ipp>
#include <aedis/resp3/impl/node.ipp>
#include <aedis/resp3/detail/impl/parser.ipp>

View File

@@ -33,7 +33,7 @@ class receiver : public std::enable_shared_from_this<receiver> {
public:
private:
std::shared_ptr<client_type> db_;
std::vector<node> resps_;
std::vector<node<std::string>> resps_;
std::vector<std::shared_ptr<user_session_base>> sessions_;
public:
@@ -47,12 +47,12 @@ public:
break;
case command::incr:
std::cout << "Message so far: " << resps_.front().data << std::endl;
std::cout << "Message so far: " << resps_.front().value << std::endl;
break;
case command::unknown: // Server push
for (auto& session: sessions_)
session->deliver(resps_.at(3).data);
session->deliver(resps_.at(3).value);
break;
default: { /* Ignore */ }
@@ -78,11 +78,7 @@ net::awaitable<void> listener()
auto recv = std::make_shared<receiver>(db);
db->set_response_adapter(recv->adapter());
db->set_reader_callback([recv](command cmd) {recv->on_message(cmd);});
auto on_run = [](auto const ec)
{ std::clog << "Lost connection to redis: " << ec.message() << std::endl;};
db->async_run("localhost", "6379", on_run);
db->async_run({net::ip::make_address("127.0.0.1"), 6379}, [](auto){});
net::co_spawn(ex, signal_handler(acc, db), net::detached);

View File

@@ -8,6 +8,7 @@
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
@@ -20,16 +21,16 @@ namespace redis = aedis::redis;
using aedis::redis::command;
using aedis::redis::client;
using aedis::resp3::node;
using aedis::user_session;
using aedis::user_session_base;
// From lib/net_utils.hpp
using aedis::signal_handler;
using aedis::user_session;
using aedis::user_session_base;
using client_type = client<aedis::net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
class receiver : public std::enable_shared_from_this<receiver> {
private:
std::vector<node> resps_;
std::vector<node<std::string>> resps_;
std::queue<std::shared_ptr<user_session_base>> sessions_;
public:
@@ -37,12 +38,12 @@ public:
{
switch (cmd) {
case command::ping:
sessions_.front()->deliver(resps_.front().data);
sessions_.front()->deliver(resps_.front().value);
sessions_.pop();
break;
case command::incr:
std::cout << "Echos so far: " << resps_.front().data << std::endl;
std::cout << "Echos so far: " << resps_.front().value << std::endl;
break;
default:
@@ -59,27 +60,14 @@ public:
{ sessions_.push(session); }
};
net::awaitable<void> listener()
net::awaitable<void>
listener(
std::shared_ptr<tcp_acceptor> acc,
std::shared_ptr<client_type> db,
std::shared_ptr<receiver> recv)
{
auto ex = co_await net::this_coro::executor;
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<net::ip::tcp::acceptor>(ex, endpoint);
auto db = std::make_shared<client_type>(ex);
auto recv = std::make_shared<receiver>();
db->set_response_adapter(recv->adapter());
db->set_reader_callback([recv](command cmd) {recv->on_message(cmd);});
auto on_run = [](auto const ec)
{ std::clog << "Lost connection to redis: " << ec.message() << std::endl;};
db->async_run("localhost", "6379", on_run);
net::co_spawn(ex, signal_handler(acc, db), net::detached);
for (;;) {
auto socket = co_await acc->async_accept(net::use_awaitable);
auto socket = co_await acc->async_accept();
auto session = std::make_shared<user_session>(std::move(socket));
auto on_user_msg = [db, recv, session](std::string const& msg)
@@ -97,7 +85,26 @@ int main()
{
try {
net::io_context ioc;
co_spawn(ioc, listener(), net::detached);
auto db = std::make_shared<client_type>(ioc.get_executor());
auto recv = std::make_shared<receiver>();
db->set_response_adapter(recv->adapter());
db->set_reader_callback([recv](command cmd) {recv->on_message(cmd);});
db->async_run({net::ip::make_address("127.0.0.1"), 6379}, [](auto){});
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<tcp_acceptor>(ioc.get_executor(), endpoint);
net::signal_set signals(ioc.get_executor(), SIGINT, SIGTERM);
signals.async_wait([=](auto, int){
// Request redis to close the connection.
db->send(aedis::redis::command::quit);
// Stop the listener.
acc->cancel();
});
co_spawn(ioc, listener(acc, db, recv), net::detached);
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -1,81 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <map>
#include <vector>
#include <iostream>
#include <unordered_map>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::redis::experimental::adapt;
// From lib/net_utils.hpp
using aedis::connection_manager;
using socket_type = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::socket>;
using client_type = client<socket_type>;
net::awaitable<void> reader(std::shared_ptr<client_type> db)
{
std::map<std::string, std::string> map
{ {"key1", "value1"}
, {"key2", "value2"}
, {"key3", "value3"}
};
// Enqueue the requests.
db->send(command::hello, 3);
db->send(command::flushall);
db->send_range(command::hset, "key", std::cbegin(map), std::cend(map));
db->send(command::hgetall, "key");
db->send(command::hgetall, "key");
db->send(command::hgetall, "key");
db->send(command::quit);
// The expected responses
int hset;
std::vector<std::string> hgetall1;
std::map<std::string, std::string> hgetall2;
std::unordered_map<std::string, std::string> hgetall3;
// Reads the responses.
co_await db->async_read(); // hello
co_await db->async_read(); // flushall
co_await db->async_read(adapt(hset));
co_await db->async_read(adapt(hgetall1));
co_await db->async_read(adapt(hgetall2));
co_await db->async_read(adapt(hgetall3));
co_await db->async_read(); // quit
// Reads eof (caused by the quit command).
boost::system::error_code ec;
co_await db->async_read(adapt(), net::redirect_error(net::use_awaitable, ec));
// Prints the responses.
std::cout << "hset: " << hset;
std::cout << "\nhgetall (as vector): ";
for (auto const& e: hgetall1) std::cout << e << ", ";
std::cout << "\nhgetall (as map): ";
for (auto const& e: hgetall2) std::cout << e.first << " ==> " << e.second << "; ";
std::cout << "\nhgetall (as unordered_map): ";
for (auto const& e: hgetall3) std::cout << e.first << " ==> " << e.second << "; ";
std::cout << "\n";
}
int main()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
net::co_spawn(ioc, connection_manager(db, reader(db)), net::detached);
ioc.run();
}

View File

@@ -6,6 +6,7 @@
*/
#include <iostream>
#include <string>
#include <memory>
#include <aedis/aedis.hpp>
@@ -20,7 +21,7 @@ using client_type = client<aedis::net::ip::tcp::socket>;
class receiver {
private:
node resps_;
node<std::string> resps_;
std::shared_ptr<client_type> db_;
public:
@@ -36,21 +37,21 @@ public:
break;
case command::ping:
std::cout << "Ping message: " << resps_.data << std::endl;
std::cout << "Ping message: " << resps_.value << std::endl;
break;
case command::incr:
std::cout << "Ping counter: " << resps_.data << std::endl;
std::cout << "Ping counter: " << resps_.value << std::endl;
break;
case command::quit:
std::cout << command::quit << ": " << resps_.data << std::endl;
std::cout << command::quit << ": " << resps_.value << std::endl;
break;
default:;
}
resps_.data.clear();
resps_.value.clear();
}
auto adapter() { return redis::adapt(resps_); }
@@ -65,6 +66,6 @@ int main()
db->set_response_adapter(recv.adapter());
db->set_reader_callback(std::ref(recv));
db->async_run("localhost", "6379", [](auto){});
db->async_run({net::ip::make_address("127.0.0.1"), 6379}, [](auto){});
ioc.run();
}

View File

@@ -28,10 +28,10 @@ connect(
co_return std::move(socket);
}
template <class Socket>
template <class Acceptor, class Socket>
aedis::net::awaitable<void>
signal_handler(
std::shared_ptr<aedis::net::ip::tcp::acceptor> acc,
std::shared_ptr<Acceptor> acc,
std::shared_ptr<aedis::redis::client<Socket>> db)
{
auto ex = co_await aedis::net::this_coro::executor;

View File

@@ -1,83 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <iostream>
#include <string>
#include <vector>
#include <list>
#include <deque>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::redis::experimental::adapt;
// From lib/net_utils.hpp
using aedis::connection_manager;
using socket_type = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::socket>;
using client_type = client<socket_type>;
net::awaitable<void> reader(std::shared_ptr<client_type> db)
{
auto vec = {1, 2, 3, 4, 5, 6};
// Enqueue the commands.
db->send(command::hello, 3);
db->send(command::flushall);
db->send_range(command::rpush, "key", std::cbegin(vec), std::cend(vec));
db->send(command::lrange, "key", 0, -1);
db->send(command::lrange, "key", 0, -1);
db->send(command::lrange, "key", 0, -1);
db->send(command::lrange, "key", 0, -1);
db->send(command::quit);
// Expected responses.
int rpush;
std::vector<std::string> svec;
std::list<std::string> slist;
std::deque<std::string> sdeq;
std::vector<int> ivec;
// Reads the responses.
co_await db->async_read(); // hello
co_await db->async_read(); // flushall
co_await db->async_read(adapt(rpush)); // rpush
co_await db->async_read(adapt(svec));
co_await db->async_read(adapt(slist));
co_await db->async_read(adapt(sdeq));
co_await db->async_read(adapt(ivec));
co_await db->async_read(); // quit
// Reads eof (caused by the quit command).
boost::system::error_code ec;
co_await db->async_read(adapt(), net::redirect_error(net::use_awaitable, ec));
// Prints the responses.
std::cout << "rpush: " << rpush;
std::cout << "\nlrange (as vector): ";
for (auto e: svec) std::cout << e << " ";
std::cout << "\nlrange (as list): ";
for (auto e: slist) std::cout << e << " ";
std::cout << "\nlrange (as deque): ";
for (auto e: sdeq) std::cout << e << " ";
std::cout << "\nlrange (as vector<int>): ";
for (auto e: ivec) std::cout << e << " ";
std::cout << std::endl;
}
int main()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
net::co_spawn(ioc, connection_manager(db, reader(db)), net::detached);
ioc.run();
}

View File

@@ -1,86 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <iostream>
#include <vector>
#include <tuple>
#include <array>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace resp3 = aedis::resp3;
using aedis::redis::command;
using aedis::redis::make_serializer;
using resp3::adapt;
using resp3::node;
namespace net = aedis::net;
using net::async_write;
using net::buffer;
using net::dynamic_buffer;
// From lib/net_utils.hpp
using aedis::connect;
// The response to some commands can't be transalated into C++ data
// structures like \c std::string or STL containers, A multipurpose
// response is provided for such cases.
net::awaitable<void> multipurpose_response()
{
try {
auto socket = co_await connect();
auto list = {"one", "two", "three"};
std::string request;
auto sr = make_serializer(request);
sr.push(command::hello, 3);
sr.push(command::flushall);
sr.push(command::multi);
sr.push(command::ping, "Some message");
sr.push(command::incr, "incr-key");
sr.push_range(command::rpush, "list-key", std::cbegin(list), std::cend(list));
sr.push(command::lrange, "list-key", 0, -1);
sr.push(command::exec);
sr.push(command::quit);
co_await async_write(socket, buffer(request));
// Expected responses.
std::vector<node> exec;
// Reads the response.
std::string buffer;
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // hello
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // flushall
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // multi
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // ping
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // incr
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // rpush
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // lrange
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(exec));
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // quit
// Prints the response.
std::cout << "General format:\n";
for (auto const& e: exec) std::cout << e << "\n";
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
exit(EXIT_FAILURE);
}
}
int main()
{
net::io_context ioc;
co_spawn(ioc, multipurpose_response(), net::detached);
ioc.run();
}

View File

@@ -1,78 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <set>
#include <vector>
#include <iostream>
#include <unordered_map>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::redis::experimental::adapt;
// From lib/net_utils.hpp
using aedis::connection_manager;
using socket_type = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::socket>;
using client_type = client<socket_type>;
net::awaitable<void> reader(std::shared_ptr<client_type> db)
{
std::set<std::string> set
{"one", "two", "three", "four"};
// Enqueue the commands.
db->send(command::hello, 3);
db->send(command::flushall);
db->send_range(command::sadd, "key", std::cbegin(set), std::cend(set));
db->send(command::smembers, "key");
db->send(command::smembers, "key");
db->send(command::smembers, "key");
db->send(command::quit);
// Expected responses.
int sadd;
std::vector<std::string> smembers1;
std::set<std::string> smembers2;
std::unordered_set<std::string> smembers3;
// Reads the responses.
co_await db->async_read(); // hello
co_await db->async_read(); // flushall
co_await db->async_read(adapt(sadd));
co_await db->async_read(adapt(smembers1));
co_await db->async_read(adapt(smembers2));
co_await db->async_read(adapt(smembers3));
co_await db->async_read(); // quit
// Reads eof (caused by the quit command).
boost::system::error_code ec;
co_await db->async_read(adapt(), net::redirect_error(net::use_awaitable, ec));
// Prints the responses.
std::cout << "sadd: " << sadd;
std::cout << "\nsmembers (as vector): ";
for (auto const& e: smembers1) std::cout << e << " ";
std::cout << "\nsmembers (as set): ";
for (auto const& e: smembers2) std::cout << e << " ";
std::cout << "\nsmembers (as unordered_set): ";
for (auto const& e: smembers3) std::cout << e << " ";
std::cout << "\n";
}
int main()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
net::co_spawn(ioc, connection_manager(db, reader(db)), net::detached);
ioc.run();
}

154
examples/stl_containers.cpp Normal file
View File

@@ -0,0 +1,154 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <map>
#include <vector>
#include <memory>
#include <iostream>
#include <array>
#include <unordered_map>
#include <tuple>
#include <variant>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
namespace net = aedis::net;
namespace redis = aedis::redis;
using aedis::redis::command;
using aedis::redis::client;
using aedis::resp3::node;
using aedis::resp3::response_traits_t;
using client_type = client<aedis::net::ip::tcp::socket>;
namespace aedis {
namespace redis {
template <class Tuple>
class custom_adapter {
private:
using variant_type =
boost::mp11::mp_rename<boost::mp11::mp_transform<response_traits_t, Tuple>, std::variant>;
std::array<variant_type, std::tuple_size<Tuple>::value> adapters_;
public:
custom_adapter(Tuple* r)
{ resp3::adapter::detail::assigner<std::tuple_size<Tuple>::value - 1>::assign(adapters_, *r); }
void
operator()(
command cmd,
resp3::type t,
std::size_t aggregate_size,
std::size_t depth,
char const* data,
std::size_t size,
std::error_code& ec)
{
int i = -1;
switch (cmd) {
case command::lrange: i = 0; break;
case command::smembers: i = 1; break;
default: i = 2;
}
std::visit([&](auto& arg){arg(t, aggregate_size, depth, data, size, ec);}, adapters_[i]);
}
};
}
}
class receiver {
private:
std::shared_ptr<client_type> db_;
using responses_type =
std::tuple<
std::list<int>,
std::set<std::string>,
std::vector<node<std::string>>>;
responses_type resps_;
std::map<std::string, std::string> map
{ {"key1", "value1"}
, {"key2", "value2"}
, {"key3", "value3"}
};
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
std::set<std::string> set
{"one", "two", "three", "four"};
public:
receiver(std::shared_ptr<client_type> db) : db_{db} {}
auto adapter() { return redis::custom_adapter<responses_type>(&resps_); }
void on_hello()
{
db_->send_range(command::hset, "hset-key", std::cbegin(map), std::cend(map));
db_->send_range(command::rpush, "rpush-key", std::cbegin(vec), std::cend(vec));
db_->send_range(command::sadd, "sadd-key", std::cbegin(set), std::cend(set));
db_->send(command::hgetall, "hset-key");
db_->send(command::lrange, "rpush-key", 0, -1);
db_->send(command::smembers, "sadd-key");
db_->send(command::quit);
}
void on_hgetall()
{
//for (auto const& e: hgetall3) std::cout << e.first << " ==> " << e.second << "; ";
std::cout << "\n";
}
void on_lrange()
{
std::cout << "\n";
for (auto const& e: std::get<0>(resps_)) std::cout << e << " ";
std::cout << "\n";
std::get<0>(resps_).clear();
}
void on_smembers()
{
std::cout << "\n";
for (auto const& e: std::get<1>(resps_)) std::cout << e << " ";
std::cout << "\n";
std::get<1>(resps_).clear();
}
void operator()(command cmd)
{
switch (cmd) {
case command::hello: on_hello(); break;
case command::hgetall: on_hgetall(); break;
case command::lrange: on_lrange(); break;
case command::smembers: on_smembers(); break;
default:;
}
}
};
int main()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
receiver recv{db};
db->set_response_adapter(recv.adapter());
db->set_reader_callback(std::ref(recv));
db->async_run({net::ip::make_address("127.0.0.1"), 6379}, [](auto){});
ioc.run();
}

View File

@@ -6,84 +6,76 @@
*/
#include <iostream>
#include <chrono>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace resp3 = aedis::resp3;
using aedis::redis::command;
using aedis::redis::make_serializer;
using resp3::adapt;
using resp3::node;
namespace net = aedis::net;
using net::async_write;
using net::buffer;
using net::dynamic_buffer;
// From lib/net_utils.hpp
using aedis::connect;
namespace redis = aedis::redis;
using aedis::redis::command;
using aedis::redis::client;
using aedis::resp3::node;
using client_type = client<aedis::net::ip::tcp::socket>;
/* In this example we send a subscription to a channel and start
reading server side messages indefinitely.
Notice we store the id of the connection (attributed by the redis
server) to be able to identify it (in the logs for example).
After starting the example you can test it by sending messages with
redis-cli like this
$ redis-cli -3
127.0.0.1:6379> PUBLISH channel1 some-message
(integer) 3
127.0.0.1:6379>
The messages will then appear on the terminal you are running the
example.
* reading server side messages indefinitely.
*
* After starting the example you can test it by sending messages with
* redis-cli like this
*
* $ redis-cli -3
* 127.0.0.1:6379> PUBLISH channel1 some-message
* (integer) 3
* 127.0.0.1:6379>
*
* The messages will then appear on the terminal you are running the
* example.
*/
net::awaitable<void> subscriber()
{
try {
auto socket = co_await connect();
std::string request;
auto sr = make_serializer(request);
sr.push(command::hello, "3");
sr.push(command::subscribe, "channel1", "channel2");
co_await async_write(socket, buffer(request));
class receiver {
private:
std::vector<node<std::string>> resps_;
std::shared_ptr<client_type> db_;
std::vector<node> resp;
public:
receiver(std::shared_ptr<client_type> db) : db_{db} {}
// Reads the response to the hello command.
std::string buffer;
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(resp));
co_await resp3::async_read(socket, dynamic_buffer(buffer));
// Saves the id of this connection.
auto const id = resp.at(8).data;
// Loops to receive server pushes.
for (;;) {
resp.clear();
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(resp));
void operator()(command cmd)
{
switch (cmd) {
case command::hello:
db_->send(command::subscribe, "channel1", "channel2");
break;
case command::unknown:
std::cout
<< "Subscriber " << id << ":\n"
<< resp << std::endl;
<< "Event: " << resps_.at(1).value << "\n"
<< "Channel: " << resps_.at(2).value << "\n"
<< "Message: " << resps_.at(3).value << "\n"
<< std::endl;
break;
default:;
}
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
resps_.clear();
}
}
auto adapter() { return redis::adapt(resps_); }
};
int main()
{
net::io_context ioc;
co_spawn(ioc, subscriber(), net::detached);
co_spawn(ioc, subscriber(), net::detached);
co_spawn(ioc, subscriber(), net::detached);
auto db = std::make_shared<client_type>(ioc.get_executor());
receiver recv{db};
db->set_response_adapter(recv.adapter());
db->set_reader_callback(std::ref(recv));
db->async_run({net::ip::make_address("127.0.0.1"), 6379}, [](auto){});
ioc.run();
}

View File

@@ -25,7 +25,8 @@ namespace this_coro = net::this_coro;
using namespace aedis;
using namespace aedis::resp3;
std::vector<node> gresp;
using node_type = node<std::string>;
std::vector<node_type> gresp;
//-------------------------------------------------------------------
@@ -36,9 +37,9 @@ void simple_string_sync()
auto dbuffer = net::dynamic_buffer(rbuffer);
{
node result;
node_type result;
resp3::read(ts, dbuffer, adapt(result));
node expected {resp3::type::simple_string, 1UL, 0UL, {"OK"}};
node_type expected {resp3::type::simple_string, 1UL, 0UL, {"OK"}};
check_equal(result, expected, "simple_string (node-sync)");
}
@@ -64,9 +65,9 @@ void simple_string_sync_empty()
auto dbuffer = net::dynamic_buffer(rbuffer);
{
node result;
node_type result;
resp3::read(ts, dbuffer, adapt(result));
node expected {resp3::type::simple_string, 1UL, 0UL, {""}};
node_type expected {resp3::type::simple_string, 1UL, 0UL, {""}};
check_equal(result, expected, "simple_string (empty-node-sync)");
}
@@ -92,9 +93,9 @@ net::awaitable<void> simple_string_async()
auto dbuffer = net::dynamic_buffer(rbuffer);
{
node result;
node_type result;
co_await resp3::async_read(ts, dbuffer, adapt(result));
node expected {resp3::type::simple_string, 1UL, 0UL, {"OK"}};
node_type expected {resp3::type::simple_string, 1UL, 0UL, {"OK"}};
check_equal(result, expected, "simple_string (node-async)");
}
@@ -120,9 +121,9 @@ net::awaitable<void> simple_string_async_empty()
auto dbuffer = net::dynamic_buffer(rbuffer);
{
node result;
node_type result;
co_await resp3::async_read(ts, dbuffer, adapt(result));
node expected {resp3::type::simple_string, 1UL, 0UL, {""}};
node_type expected {resp3::type::simple_string, 1UL, 0UL, {""}};
check_equal(result, expected, "simple_string (empty-node-async)");
}
@@ -156,9 +157,9 @@ net::awaitable<void> test_simple_error_async()
{
test_tcp_socket ts {"-Error\r\n"};
node result;
node_type result;
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(result));
node expected {resp3::type::simple_error, 1UL, 0UL, {"Error"}};
node_type expected {resp3::type::simple_error, 1UL, 0UL, {"Error"}};
check_equal(result, expected, "simple_error (node-async)");
}
@@ -190,9 +191,9 @@ net::awaitable<void> test_number()
{
test_tcp_socket ts {":-3\r\n"};
node result;
node_type result;
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(result));
node expected {resp3::type::number, 1UL, 0UL, {"-3"}};
node_type expected {resp3::type::number, 1UL, 0UL, {"-3"}};
check_equal(result, expected, "number (node-async)");
}
@@ -220,11 +221,11 @@ net::awaitable<void> array_async()
auto dbuf = net::dynamic_buffer(buf);
{
std::vector<node> result;
std::vector<node_type> result;
boost::system::error_code ec;
co_await resp3::async_read(ts, dbuf, adapt(result), net::redirect_error(net::use_awaitable, ec));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::array, 3UL, 0UL, {}}
, {resp3::type::blob_string, 1UL, 1UL, {"11"}}
, {resp3::type::blob_string, 1UL, 1UL, {"22"}}
@@ -263,7 +264,7 @@ net::awaitable<void> test_blob_string()
std::string cmd {"$2\r\nhh\r\n"};
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, {"hh"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
check_equal(gresp, expected, "blob_string");
@@ -273,7 +274,7 @@ net::awaitable<void> test_blob_string()
std::string cmd {"$26\r\nhhaa\aaaa\raaaaa\r\naaaaaaaaaa\r\n"};
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
check_equal(gresp, expected, "blob_string (with separator)");
@@ -283,7 +284,7 @@ net::awaitable<void> test_blob_string()
std::string cmd {"$0\r\n\r\n"};
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, {}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
check_equal(gresp, expected, "blob_string (size 0)");
@@ -297,8 +298,8 @@ net::awaitable<void> test_floating_point()
{
std::string cmd {",1.23\r\n"};
test_tcp_socket ts {cmd};
std::vector<node> resp;
std::vector<node> expected
std::vector<node_type> resp;
std::vector<node_type> expected
{ {resp3::type::doublean, 1UL, 0UL, {"1.23"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp));
check_equal(resp, expected, "double");
@@ -307,9 +308,9 @@ net::awaitable<void> test_floating_point()
{
std::string cmd {",inf\r\n"};
test_tcp_socket ts {cmd};
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::doublean, 1UL, 0UL, {"inf"}} };
check_equal(resp, expected, "double (inf)");
}
@@ -317,9 +318,9 @@ net::awaitable<void> test_floating_point()
{
std::string cmd {",-inf\r\n"};
test_tcp_socket ts {cmd};
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::doublean, 1UL, 0UL, {"-inf"}} };
check_equal(resp, expected, "double (-inf)");
}
@@ -333,8 +334,8 @@ net::awaitable<void> test_boolean()
{
std::string cmd {"#f\r\n"};
test_tcp_socket ts {cmd};
std::vector<node> resp;
std::vector<node> expected
std::vector<node_type> resp;
std::vector<node_type> expected
{ {resp3::type::boolean, 1UL, 0UL, {"f"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp));
@@ -344,8 +345,8 @@ net::awaitable<void> test_boolean()
{
std::string cmd {"#t\r\n"};
test_tcp_socket ts {cmd};
std::vector<node> resp;
std::vector<node> expected
std::vector<node_type> resp;
std::vector<node_type> expected
{ {resp3::type::boolean, 1UL, 0UL, {"t"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp));
@@ -361,7 +362,7 @@ net::awaitable<void> test_blob_error()
std::string cmd {"!21\r\nSYNTAX invalid syntax\r\n"};
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_error, 1UL, 0UL, {"SYNTAX invalid syntax"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
check_equal(gresp, expected, "blob_error (message)");
@@ -371,7 +372,7 @@ net::awaitable<void> test_blob_error()
std::string cmd {"!0\r\n\r\n"};
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_error, 1UL, 0UL, {}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
@@ -387,7 +388,7 @@ net::awaitable<void> test_verbatim_string()
std::string cmd {"=15\r\ntxt:Some string\r\n"};
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::verbatim_string, 1UL, 0UL, {"txt:Some string"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
check_equal(gresp, expected, "verbatim_string");
@@ -398,7 +399,7 @@ net::awaitable<void> test_verbatim_string()
test_tcp_socket ts {cmd};
gresp.clear();
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::verbatim_string, 1UL, 0UL, {}} };
check_equal(gresp, expected, "verbatim_string (empty)");
}
@@ -413,7 +414,7 @@ net::awaitable<void> test_set2()
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::set, 5UL, 0UL, {}}
, {resp3::type::simple_string, 1UL, 1UL, {"orange"}}
, {resp3::type::simple_string, 1UL, 1UL, {"apple"}}
@@ -431,7 +432,7 @@ net::awaitable<void> test_set2()
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::set, 0UL, 0UL, {}}
};
@@ -496,7 +497,7 @@ net::awaitable<void> test_map()
gresp.clear();
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::map, 7UL, 0UL, {}}
, {resp3::type::blob_string, 1UL, 1UL, {"server"}}
, {resp3::type::blob_string, 1UL, 1UL, {"redis"}}
@@ -521,7 +522,7 @@ net::awaitable<void> test_map()
test_tcp_socket ts {cmd};
gresp.clear();
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::map, 0UL, 0UL, {}} };
check_equal(gresp, expected, "test map (empty)");
}
@@ -535,7 +536,7 @@ net::awaitable<void> test_streamed_string()
std::string cmd {"$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;1\r\nd\r\n;0\r\n"};
test_tcp_socket ts {cmd};
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::streamed_string_part, 1UL, 0UL, {"Hello world"}} };
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(gresp));
check_equal(gresp, expected, "streamed string");
@@ -544,10 +545,10 @@ net::awaitable<void> test_streamed_string()
{
std::string cmd {"$?\r\n;0\r\n"};
test_tcp_socket ts {cmd};
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(ts, net::dynamic_buffer(buf), adapt(resp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::streamed_string_part, 1UL, 0UL, {}} };
check_equal(resp, expected, "streamed string (empty)");
}
@@ -588,9 +589,9 @@ net::awaitable<void> optional_async()
auto dbuf = net::dynamic_buffer(buf);
{
node result;
node_type result;
co_await resp3::async_read(ts, dbuf, adapt(result));
node expected {resp3::type::null, 1UL, 0UL, {""}};
node_type expected {resp3::type::null, 1UL, 0UL, {""}};
check_equal(result, expected, "optional (node-async)");
}

View File

@@ -25,7 +25,8 @@ using namespace aedis::resp3;
using aedis::redis::command;
using aedis::redis::make_serializer;
std::vector<node> gresp;
using node_type = node<std::string>;
std::vector<node_type> gresp;
//-------------------------------------------------------------------
@@ -102,28 +103,28 @@ test_general(net::ip::tcp::resolver::results_type const& res)
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(), net::use_awaitable);
{ // rpush:
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
auto const n = std::to_string(std::size(list_));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::number, 1UL, 0UL, n} };
check_equal(resp, expected, "rpush (value)");
}
{ // llen
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::number, 1UL, 0UL, {"6"}} };
check_equal(resp, expected, "llen");
}
{ // lrange
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::array, 6UL, 0UL, {}}
, {resp3::type::blob_string, 1UL, 1UL, {"1"}}
, {resp3::type::blob_string, 1UL, 1UL, {"2"}}
@@ -137,30 +138,30 @@ test_general(net::ip::tcp::resolver::results_type const& res)
}
{ // ltrim
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
check_equal(resp, expected, "ltrim");
}
{ // lpop
std::vector<node> resp;
std::vector<node_type> resp;
co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, {"3"}} };
check_equal(resp, expected, "lpop");
}
//{ // lpop
// std::vector<node> resp;
// std::vector<node_type> resp;
// co_await resp3::async_read(socket, net::dynamic_buffer(buffer), adapt(resp), net::use_awaitable);
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::array, 2UL, 0UL, {}}
// , {resp3::type::array, 1UL, 1UL, {"4"}}
// , {resp3::type::array, 1UL, 1UL, {"5"}}
@@ -173,7 +174,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// static int c = 0;
// if (c == 0) {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::array, 6UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"1"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"2"}}
@@ -185,7 +186,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// check_equal(resp, expected, "lrange ");
// } else {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"QUEUED"}} };
// check_equal(resp, expected, "lrange (inside transaction)");
@@ -198,126 +199,126 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// switch (cmd) {
// case command::multi:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// check_equal(resp, expected, "multi");
// } break;
// case command::ping:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"QUEUED"}} };
// check_equal(resp, expected, "ping");
// } break;
// case command::set:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// check_equal(resp, expected, "set");
// } break;
// case command::quit:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// check_equal(resp, expected, "quit");
// } break;
// case command::flushall:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// check_equal(resp, expected, "flushall");
// } break;
// case command::append:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"4"}} };
// check_equal(resp, expected, "append");
// } break;
// case command::hset:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"2"}} };
// check_equal(resp, expected, "hset");
// } break;
// case command::del:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// check_equal(resp, expected, "del");
// } break;
// case command::incr:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// check_equal(resp, expected, "incr");
// } break;
// case command::publish:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// check_equal(resp, expected, "publish");
// } break;
// case command::hincrby:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"10"}} };
// check_equal(resp, expected, "hincrby");
// } break;
// case command::zadd:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// check_equal(resp, expected, "zadd");
// } break;
// case command::sadd:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"3"}} };
// check_equal(resp, expected, "sadd");
// } break;
// case command::hdel:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"2"}} };
// check_equal(resp, expected, "hdel");
// } break;
// case command::zremrangebyscore:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"1"}} };
// check_equal(resp, expected, "zremrangebyscore");
// } break;
// case command::get:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::blob_string, 1UL, 0UL, test.set_} };
// check_equal(resp, expected, "get");
// } break;
// case command::hget:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::blob_string, 1UL, 0UL, std::string{"value2"}} };
// check_equal(resp, expected, "hget");
// } break;
// case command::hvals:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::array, 2UL, 0UL, {}}
// , {resp3::type::array, 1UL, 1UL, {"value1"}}
// , {resp3::type::array, 1UL, 1UL, {"value2"}}
@@ -327,7 +328,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// } break;
// case command::zrange:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::array, 1UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"Marcelo"}}
// };
@@ -336,7 +337,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// } break;
// case command::zrangebyscore:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::array, 1UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"Marcelo"}}
// };
@@ -345,7 +346,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// } break;
// case command::exec:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::array, 3UL, 0UL, {}}
// , {resp3::type::simple_string, 1UL, 1UL, {"PONG"}}
// , {resp3::type::array, 2UL, 1UL, {}}
@@ -359,7 +360,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// } break;
// case command::hgetall:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::map, 2UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"field1"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"value1"}}
@@ -370,7 +371,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// } break;
// case command::smembers:
// {
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::set, 3UL, 0UL, {}}
// , {resp3::type::blob_string, 1UL, 1UL, {"1"}}
// , {resp3::type::blob_string, 1UL, 1UL, {"2"}}
@@ -416,14 +417,14 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// { // flushall
// gresp.clear();
// co_await async_read(socket, buf, gresp);
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// check_equal(gresp, expected, "flushall");
// }
//
// { // rpush
// gresp.clear();
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::number, 1UL, 0UL, {"6"}} };
// co_await async_read(socket, buf, gresp);
// check_equal(gresp, expected, "rpush");
@@ -445,7 +446,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
//
// { // ltrim
// gresp.clear();
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
//
// co_await async_read(socket, buf, gresp);
@@ -454,7 +455,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
//
// { // lpop. Why a blob string instead of a number?
// gresp.clear();
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::blob_string, 1UL, 0UL, {"3"}} };
//
// co_await async_read(socket, buf, gresp);
@@ -464,7 +465,7 @@ test_general(net::ip::tcp::resolver::results_type const& res)
// { // quit
// gresp.clear();
// co_await async_read(socket, buf, gresp);
// std::vector<node> expected
// std::vector<node_type> expected
// { {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
// check_equal(gresp, expected, "ltrim");
// }
@@ -511,14 +512,14 @@ test_set(net::ip::tcp::resolver::results_type const& results)
{ // set
gresp.clear();
co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
check_equal(gresp, expected, "set1");
}
{ // get
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, test_bulk1} };
co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp));
@@ -528,14 +529,14 @@ test_set(net::ip::tcp::resolver::results_type const& results)
{ // set
gresp.clear();
co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
check_equal(gresp, expected, "ltrim");
}
{ // get
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, test_bulk2} };
co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp));
check_equal(gresp, expected, "get2");
@@ -544,14 +545,14 @@ test_set(net::ip::tcp::resolver::results_type const& results)
{ // set
gresp.clear();
co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
check_equal(gresp, expected, "set3");
}
{ // get
gresp.clear();
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::blob_string, 1UL, 0UL, {}} };
co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp));
@@ -561,7 +562,7 @@ test_set(net::ip::tcp::resolver::results_type const& results)
{ // quit
gresp.clear();
co_await resp3::async_read(socket, net::dynamic_buffer(buf), adapt(gresp));
std::vector<node> expected
std::vector<node_type> expected
{ {resp3::type::simple_string, 1UL, 0UL, {"OK"}} };
check_equal(gresp, expected, "quit");
}