2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Fixes many bugs in the adapters.

This commit is contained in:
Marcelo Zimbres
2022-03-04 23:21:02 +01:00
parent 54ba34c70c
commit 0723922ac1
18 changed files with 332 additions and 468 deletions

View File

@@ -15,12 +15,10 @@ check_PROGRAMS += intro
check_PROGRAMS += aggregates
check_PROGRAMS += stl_containers
check_PROGRAMS += serialization
check_PROGRAMS += key_expiration
check_PROGRAMS += receiver
check_PROGRAMS += sync
check_PROGRAMS += test_offline
check_PROGRAMS += test_online
check_PROGRAMS += transaction
EXTRA_PROGRAMS =
EXTRA_PROGRAMS += subscriber
@@ -38,7 +36,6 @@ intro_SOURCES = $(top_srcdir)/examples/intro.cpp
aggregates_SOURCES = $(top_srcdir)/examples/aggregates.cpp
stl_containers_SOURCES = $(top_srcdir)/examples/stl_containers.cpp
serialization_SOURCES = $(top_srcdir)/examples/serialization.cpp
key_expiration_SOURCES = $(top_srcdir)/examples/key_expiration.cpp
receiver_SOURCES = $(top_srcdir)/examples/receiver.cpp
sync_SOURCES = $(top_srcdir)/examples/sync.cpp
commands_SOURCES = $(top_srcdir)/tools/commands.cpp
@@ -47,13 +44,12 @@ test_online_SOURCES = $(top_srcdir)/tests/online.cpp
subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp
echo_server_SOURCES = $(top_srcdir)/examples/echo_server.cpp
chat_room_SOURCES = $(top_srcdir)/examples/chat_room.cpp
transaction_SOURCES = $(top_srcdir)/examples/transaction.cpp
nobase_include_HEADERS =\
$(top_srcdir)/aedis/config.hpp\
$(top_srcdir)/aedis/src.hpp\
$(top_srcdir)/aedis/redis/command.hpp\
$(top_srcdir)/aedis/redis/receiver_tuple.hpp\
$(top_srcdir)/aedis/redis/receiver.hpp\
$(top_srcdir)/aedis/redis/detail/client_ops.hpp\
$(top_srcdir)/aedis/sentinel/command.hpp\
$(top_srcdir)/aedis/aedis.hpp\
@@ -76,8 +72,7 @@ nobase_include_HEADERS =\
$(top_srcdir)/aedis/redis/client.hpp
nobase_noinst_HEADERS =\
$(top_srcdir)/examples/lib/net_utils.hpp\
$(top_srcdir)/examples/lib/user_session.hpp\
$(top_srcdir)/examples/user_session.hpp\
$(top_srcdir)/tests/check.hpp\
$(top_srcdir)/tests/test_stream.hpp

View File

@@ -17,7 +17,7 @@
#include <aedis/resp3/serializer.hpp>
#include <aedis/resp3/response_traits.hpp>
#include <aedis/redis/client.hpp>
#include <aedis/redis/receiver_tuple.hpp>
#include <aedis/redis/receiver.hpp>
/** \mainpage Documentation
\tableofcontents
@@ -25,12 +25,55 @@
\section Overview
Aedis is low-level redis client library built on top of Boost.Asio
that implements communication with a Redis server over its native
protocol RESP3. It has first-class support for STL containers and
C++ built in types among other things. You will be able to
implement your own redis client or use a general purpose provided
by the library. For more information about Redis see
https://redis.io/
that implements communication with a Redis server over the latests
version of its protocol RESP3. Some of its most important features
are
1. First class support for asynchronous communication.
2. Support for STL containers.
3. Serialization and deserialization of your own data types built directly in the parser to avoid unnecessary copies.
4. Client class that encapsulates handling of requests for the user.
5. etc.
For more information about Redis see https://redis.io/
\section tutorial Tutorial
The general structure of a program involves writing a receiver like this
@code
class myreceiver : receiver<std::vector<node<std::string>>> {
public:
void on_read(command cmd) override
{
switch (cmd) {
case command::hello: on_hello(); break;
case command::set: on_set(); break;
case command::get: on_get(); break;
...
default:
}
}
};
@endcode
and to start communication with Redis
@code
int main()
{
net::io_context ioc;
client<net::ip::tcp::socket> db(ioc.get_executor());
myreceiver recv;
db.async_run(
recv,
{net::ip::make_address("127.0.0.1"), 6379},
[](auto ec){ std::cout << ec.message() << std::endl;});
ioc.run();
}
@endcode
\section examples Examples

View File

@@ -24,16 +24,22 @@ namespace redis {
* \ingroup any
*/
template <class ...Ts>
class receiver_tuple {
class receiver {
private:
using tuple_type = std::tuple<Ts...>;
using variant_type = boost::mp11::mp_rename<boost::mp11::mp_transform<resp3::response_traits_t, tuple_type>, std::variant>;
tuple_type resps_;
std::array<variant_type, std::tuple_size<tuple_type>::value> adapters_;
bool on_transaction_ = false;
protected:
virtual int to_tuple_index(command cmd) { return 0; }
virtual void on_read_impl(command) {}
virtual void on_write_impl(std::size_t) {}
virtual int to_tuple_idx_impl(command) { return 0;}
public:
receiver()
{ resp3::adapter::detail::assigner<std::tuple_size<tuple_type>::value - 1>::assign(adapters_, resps_); }
template <class T>
auto& get() { return std::get<T>(resps_);};
@@ -41,12 +47,8 @@ protected:
template <class T>
auto const& get() const { return std::get<T>(resps_);};
template <class T>
constexpr int index_of() const {return boost::mp11::mp_find<tuple_type, T>::value;}
public:
receiver_tuple()
{ resp3::adapter::detail::assigner<std::tuple_size<tuple_type>::value - 1>::assign(adapters_, resps_); }
template <class T>
constexpr int index_of() const {return boost::mp11::mp_find<tuple_type, T>::value;}
void
on_resp3(
@@ -65,8 +67,34 @@ public:
std::visit([&](auto& arg){arg(t, aggregate_size, depth, data, size, ec);}, adapters_[i]);
}
virtual void on_read(command) {}
virtual void on_write(std::size_t) {}
void on_read(command cmd)
{
if (on_transaction_)
return;
on_read_impl(cmd);
}
void on_write(std::size_t n)
{
on_write_impl(n);
}
int to_tuple_index(command cmd)
{
if (cmd == command::multi) {
on_transaction_ = true;
return -1;
}
if (cmd == command::exec)
on_transaction_ = false;
if (on_transaction_)
return -1;
return to_tuple_idx_impl(cmd);
}
};
} // redis

View File

@@ -158,6 +158,8 @@ public:
std::error_code& ec)
{
set_on_resp3_error(t, ec);
if (ec)
return;
if (is_aggregate(t)) {
ec = adapter::error::expects_simple_type;
@@ -187,6 +189,8 @@ public:
std::error_code& ec)
{
set_on_resp3_error2(t, ec);
if (ec)
return;
if (is_aggregate(t)) {
ec = adapter::error::expects_simple_type;
@@ -230,6 +234,8 @@ public:
std::error_code& ec)
{
set_on_resp3_error(t, ec);
if (ec)
return;
if (is_aggregate(t)) {
if (i_ != -1) {
@@ -241,7 +247,10 @@ public:
result_->resize(m * aggregate_size);
++i_;
} else {
assert(aggregate_size == 1);
if (aggregate_size != 1) {
ec = adapter::error::nested_unsupported;
return;
}
from_string(result_->at(i_), value, data_size, ec);
++i_;
@@ -266,18 +275,23 @@ public:
std::error_code& ec)
{
set_on_resp3_error(t, ec);
if (ec)
return;
if (is_aggregate(t)) {
if (depth != 0) {
if (depth != 0 && depth != 1) {
ec = adapter::error::nested_unsupported;
return;
}
return;
}
assert(aggregate_size == 1);
if (aggregate_size != 1) {
ec = adapter::error::nested_unsupported;
return;
}
if (depth != 1) {
if (depth < 1) {
ec = adapter::error::nested_unsupported;
return;
}
@@ -308,6 +322,8 @@ public:
std::error_code& ec)
{
set_on_resp3_error(t, ec);
if (ec)
return;
if (t == type::set) {
assert(depth == 0);
@@ -353,15 +369,27 @@ public:
std::error_code& ec)
{
set_on_resp3_error(t, ec);
if (ec)
return;
if (t == type::map) {
assert(depth == 0);
if (is_aggregate(t)) {
assert(t == type::map);
if (depth != 0 && depth != 1) {
ec = adapter::error::nested_unsupported;
return;
}
return;
}
assert(!is_aggregate(t));
assert(depth == 1);
assert(aggregate_size == 1);
if (aggregate_size != 1) {
ec = adapter::error::nested_unsupported;
return;
}
if (depth < 1) {
ec = adapter::error::nested_unsupported;
return;
}
if (on_key_) {
typename Container::key_type obj;

View File

@@ -39,7 +39,7 @@ struct node {
/** \brief Converts the node to a string.
* \ingroup any
*
* \param obj The node object.
* \param in The node object.
*/
template <class String>
std::string to_string(node<String> const& in)

View File

@@ -162,6 +162,7 @@ struct assigner<0> {
}
};
// TODO: Produce error if resposes before exec are not QUEUED.
template <class Tuple>
class flat_transaction_adapter {
private:
@@ -180,7 +181,7 @@ public:
{
if (depth == 1) {
if (is_aggregate(t))
aggregate_size_ = aggregate_size;
aggregate_size_ = element_multiplicity(t) * aggregate_size;
else
++i_;
@@ -201,10 +202,8 @@ public:
std::error_code& ec)
{
if (depth == 0) {
if (aggregate_size != std::tuple_size<Tuple>::value) {
if (aggregate_size != std::tuple_size<Tuple>::value)
ec = adapter::error::incompatible_tuple_size;
return;
}
return;
}

View File

@@ -15,12 +15,13 @@
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::receiver_tuple;
using aedis::redis::receiver;
using aedis::resp3::node;
using client_type = aedis::redis::client<net::ip::tcp::socket>;
using response_type = std::vector<node<std::string>>;
void print_flat_aggregate(response_type const& v)
// Prints aggregates that don't contain nested aggregates.
void print_aggregate(response_type const& v)
{
auto const m = element_multiplicity(v.front().data_type);
for (auto i = 0lu; i < m * v.front().aggregate_size; ++i)
@@ -28,14 +29,14 @@ void print_flat_aggregate(response_type const& v)
std::cout << "\n";
}
struct receiver : receiver_tuple<response_type> {
struct myreceiver : receiver<response_type> {
public:
myreceiver(client_type& db) : db_{&db} {}
private:
client_type* db_;
public:
receiver(client_type& db) : db_{&db} {}
void on_read(command cmd) override
void on_read_impl(command cmd) override
{
switch (cmd) {
case command::hello:
@@ -67,7 +68,7 @@ public:
case command::lrange:
case command::smembers:
case command::hgetall:
print_flat_aggregate(get<response_type>());
print_aggregate(get<response_type>());
break;
default:;
@@ -81,7 +82,7 @@ int main()
{
net::io_context ioc;
client_type db{ioc.get_executor()};
receiver recv{db};
myreceiver recv{db};
db.async_run(
recv,

View File

@@ -11,32 +11,25 @@
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/user_session.hpp"
#include "lib/net_utils.hpp"
#include "user_session.hpp"
namespace net = aedis::net;
namespace redis = aedis::redis;
using redis::receiver_tuple;
using aedis::redis::receiver;
using aedis::redis::command;
using aedis::redis::client;
using aedis::resp3::node;
using aedis::resp3::type;
using aedis::user_session;
using aedis::user_session_base;
// From lib/net_utils.hpp
using aedis::signal_handler;
using client_type = redis::client<net::detached_t::as_default_on_t<aedis::net::ip::tcp::socket>>;
using client_type = aedis::redis::client<aedis::net::ip::tcp::socket>;
using response_type = std::vector<node<std::string>>;
class receiver : public receiver_tuple<response_type>, std::enable_shared_from_this<receiver> {
class myreceiver : public receiver<response_type> {
public:
private:
std::shared_ptr<client_type> db_;
std::vector<std::shared_ptr<user_session_base>> sessions_;
public:
receiver(std::shared_ptr<client_type> db) : db_{db} {}
myreceiver(std::shared_ptr<client_type> db) : db_{db} {}
void on_message(command cmd)
{
@@ -46,7 +39,7 @@ public:
break;
case command::incr:
std::cout << "Message so far: " << get<response_type>().front().value << std::endl;
std::cout << "Messages so far: " << get<response_type>().front().value << std::endl;
break;
case command::invalid: // Server push
@@ -64,18 +57,12 @@ public:
{ sessions_.push_back(session); }
};
net::awaitable<void> listener()
net::awaitable<void>
listener(
std::shared_ptr<net::ip::tcp::acceptor> acc,
std::shared_ptr<client_type> db,
std::shared_ptr<myreceiver> recv)
{
auto ex = co_await net::this_coro::executor;
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<net::ip::tcp::acceptor>(ex, endpoint);
auto db = std::make_shared<client_type>(ex);
auto recv = std::make_shared<receiver>(db);
db->async_run(*recv);
net::co_spawn(ex, signal_handler(acc, db), net::detached);
auto on_user_msg = [db](std::string const& msg)
{
db->send(command::publish, "channel", msg);
@@ -94,7 +81,25 @@ int main()
{
try {
net::io_context ioc{1};
co_spawn(ioc, listener(), net::detached);
auto db = std::make_shared<client_type>(ioc.get_executor());
auto recv = std::make_shared<myreceiver>(db);
db->async_run(
*recv,
{net::ip::make_address("127.0.0.1"), 6379},
[](auto ec){ std::cout << ec.message() << std::endl;});
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<net::ip::tcp::acceptor>(ioc.get_executor(), endpoint);
co_spawn(ioc, listener(acc, db, recv), net::detached);
net::signal_set signals(ioc.get_executor(), SIGINT, SIGTERM);
signals.async_wait([=] (auto, int) {
db->send(command::quit);
acc->cancel();
});
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -13,24 +13,19 @@
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/user_session.hpp"
#include "lib/net_utils.hpp"
#include "user_session.hpp"
namespace net = aedis::net;
namespace redis = aedis::redis;
using redis::receiver_tuple;
using aedis::redis::receiver;
using aedis::redis::command;
using aedis::redis::client;
using aedis::resp3::node;
// From lib/net_utils.hpp
using aedis::user_session;
using aedis::user_session_base;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using client_type = redis::client<net::detached_t::as_default_on_t<aedis::net::ip::tcp::socket>>;
using client_type = aedis::redis::client<aedis::net::ip::tcp::socket>;
using response_type = std::vector<node<std::string>>;
class receiver : public redis::receiver_tuple<response_type>, std::enable_shared_from_this<receiver> {
class myreceiver : public receiver<response_type> {
private:
std::queue<std::shared_ptr<user_session_base>> sessions_;
@@ -59,12 +54,12 @@ public:
net::awaitable<void>
listener(
std::shared_ptr<tcp_acceptor> acc,
std::shared_ptr<net::ip::tcp::acceptor> acc,
std::shared_ptr<client_type> db,
std::shared_ptr<receiver> recv)
std::shared_ptr<myreceiver> recv)
{
for (;;) {
auto socket = co_await acc->async_accept();
auto socket = co_await acc->async_accept(net::use_awaitable);
auto session = std::make_shared<user_session>(std::move(socket));
auto on_user_msg = [db, recv, session](std::string const& msg)
@@ -84,24 +79,23 @@ int main()
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
auto recv = std::make_shared<receiver>();
auto recv = std::make_shared<myreceiver>();
db->async_run(*recv);
db->async_run(
*recv,
{net::ip::make_address("127.0.0.1"), 6379},
[](auto ec){ std::cout << ec.message() << std::endl;});
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<tcp_acceptor>(ioc.get_executor(), endpoint);
auto acc = std::make_shared<net::ip::tcp::acceptor>(ioc.get_executor(), endpoint);
co_spawn(ioc, listener(acc, db, recv), net::detached);
net::signal_set signals(ioc.get_executor(), SIGINT, SIGTERM);
signals.async_wait([=](auto, int){
// Request redis to close the connection.
db->send(aedis::redis::command::quit);
// Stop the listener.
signals.async_wait([=] (auto, int) {
db->send(command::quit);
acc->cancel();
});
co_spawn(ioc, listener(acc, db, recv), net::detached);
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -13,21 +13,19 @@
#include <aedis/src.hpp>
namespace net = aedis::net;
using aedis::resp3::node;
using aedis::redis::command;
using aedis::redis::receiver_tuple;
using aedis::redis::receiver;
using client_type = aedis::redis::client<net::ip::tcp::socket>;
using response_type = std::vector<node<std::string>>;
using response_type = aedis::resp3::node<std::string>;
struct myreceiver : receiver<response_type> {
public:
myreceiver(client_type& db): db_{&db} {}
struct receiver : receiver_tuple<response_type> {
private:
client_type* db_;
public:
receiver(client_type& db): db_{&db} {}
void on_read(command cmd) override
void on_read_impl(command cmd) override
{
switch (cmd) {
case command::hello:
@@ -39,10 +37,8 @@ public:
break;
default:
std::cout << get<response_type>().front().value << std::endl;
std::cout << get<response_type>().value << std::endl;
}
get<response_type>().clear();
}
};
@@ -50,7 +46,7 @@ int main()
{
net::io_context ioc;
client_type db(ioc.get_executor());
receiver recv{db};
myreceiver recv{db};
db.async_run(
recv,

View File

@@ -1,96 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <iostream>
#include <chrono>
#include <optional>
#include <aedis/src.hpp>
#include <aedis/aedis.hpp>
#include "lib/net_utils.hpp"
namespace resp3 = aedis::resp3;
using aedis::redis::command;
using aedis::redis::make_serializer;
using resp3::adapt;
using resp3::node;
namespace net = aedis::net;
using net::async_write;
using net::buffer;
using net::dynamic_buffer;
// From lib/net_utils.hpp
using aedis::connect;
net::awaitable<void> key_expiration()
{
try {
auto socket = co_await connect();
// Creates and sends the first request.
std::string request;
auto sr = make_serializer(request);
sr.push(command::hello, 3);
sr.push(command::flushall);
sr.push(command::set, "key", "Some payload", "EX", "2");
sr.push(command::get, "key");
co_await async_write(socket, buffer(request));
// Will hold the response to get.
std::optional<std::string> get;
// Reads the responses.
std::string rbuffer;
co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // hello
co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // flushall
co_await resp3::async_read(socket, dynamic_buffer(rbuffer)); // set
co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(get));
std::cout
<< "Before expiration: " << get.has_value() << ", "
<< *get << std::endl;
// Waits some seconds for the key to expire.
timer tm{socket.get_executor(), std::chrono::seconds{3}};
co_await tm.async_wait();
// Creates a request to get after expiration.
get.reset(); request.clear();
sr.push(command::get, "key");
sr.push(command::get, "key");
sr.push(command::quit);
co_await async_write(socket, buffer(request));
// Reads the response to the second request.
co_await resp3::async_read(socket, dynamic_buffer(rbuffer), adapt(get));
// Reading without an optional will result in an error.
std::string str;
boost::system::error_code ec;
co_await resp3::async_read(socket, dynamic_buffer(rbuffer),
adapt(str), net::redirect_error(net::use_awaitable, ec));
// Quit
co_await resp3::async_read(socket, dynamic_buffer(rbuffer));
std::cout << "After expiration (optional): " << get.has_value() << "\n";
std::cout << "After expiration (non-optional): " << ec.message() << "\n";
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
exit(EXIT_FAILURE);
}
}
int main()
{
net::io_context ioc;
co_spawn(ioc, key_expiration(), net::detached);
ioc.run();
}

View File

@@ -1,86 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <aedis/config.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
using tcp_socket = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::socket>;
using tcp_resolver = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::resolver>;
using timer = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::steady_timer>;
namespace aedis
{
aedis::net::awaitable<tcp_socket>
connect(
std::string host = "127.0.0.1",
std::string port = "6379")
{
auto ex = co_await net::this_coro::executor;
tcp_resolver resolver{ex};
auto const res = co_await resolver.async_resolve(host, port);
tcp_socket socket{ex};
co_await aedis::net::async_connect(socket, res);
co_return std::move(socket);
}
template <class Acceptor, class Socket>
aedis::net::awaitable<void>
signal_handler(
std::shared_ptr<Acceptor> acc,
std::shared_ptr<aedis::redis::client<Socket>> db)
{
auto ex = co_await aedis::net::this_coro::executor;
aedis::net::signal_set signals(ex, SIGINT, SIGTERM);
boost::system::error_code ec;
co_await signals.async_wait(net::redirect_error(net::use_awaitable, ec));
// Closes the connection with redis.
db->send(aedis::redis::command::quit);
// Stop listening for new connections.
acc->cancel();
}
template <class T, class Socket>
net::awaitable<void>
connection_manager(
std::shared_ptr<redis::client<Socket>> db,
net::awaitable<T> reader)
{
using namespace net::experimental::awaitable_operators;
auto ex = co_await net::this_coro::executor;
tcp_resolver resolver{ex};
auto const res = co_await resolver.async_resolve("localhost", "6379");
co_await net::async_connect(db->next_layer(), std::cbegin(res), std::end(res));
co_await (db->async_writer() || std::move(reader));
}
template <class T>
void print_and_clear(std::set<T>& cont)
{
std::cout << "\n";
for (auto const& e: cont) std::cout << e << " ";
std::cout << "\n";
cont.clear();
}
template <class T>
void print_and_clear(std::list<T>& cont)
{
std::cout << "\n";
for (auto const& e: cont) std::cout << e << " ";
std::cout << "\n";
cont.clear();
}
} // aedis

View File

@@ -36,14 +36,13 @@ struct receiver {
void on_read(command cmd)
{
std::cout << "on_read: " << cmd << std::endl;
// Use the response and then clear it for the next command.
resps_.clear();
switch (cmd) {
case command::hello: db->send(command::quit);
case command::hello: db->send(command::quit); break;
default:;
}
std::cout << "on_read: " << cmd << std::endl;
resps_.clear();
}
void on_write(std::size_t n)

View File

@@ -13,8 +13,8 @@
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::receiver_tuple;
using client_type = aedis::redis::client<net::detached_t::as_default_on_t<aedis::net::ip::tcp::socket>>;
using aedis::redis::receiver;
using client_type = aedis::redis::client<aedis::net::ip::tcp::socket>;
// Arbitrary struct to de/serialize.
struct mystruct {
@@ -46,35 +46,46 @@ void from_string(mystruct& obj, char const* data, std::size_t size, std::error_c
obj.b = 2;
}
// One tuple element for each expected request.
using receiver_tuple_type =
receiver_tuple<
mystruct, // get
std::list<mystruct>, // lrange
std::set<mystruct>, // smembers
std::map<std::string, mystruct> // hgetall
using transaction_type =
std::tuple<
mystruct,
std::vector<mystruct>,
std::map<std::string, mystruct>
>;
struct receiver : receiver_tuple_type {
// One tuple element for each expected request.
using receiver_type =
receiver<
std::optional<mystruct>, // get
std::list<mystruct>, // lrange
std::set<mystruct>, // smembers
std::map<std::string, mystruct>, // hgetall
transaction_type // exec
>;
struct myreceiver : receiver_type {
public:
myreceiver(client_type& db) : db_{&db} {}
private:
client_type* db_;
int to_tuple_index(command cmd) override
int to_tuple_idx_impl(command cmd) override
{
switch (cmd) {
case command::get: return index_of<mystruct>();
case command::get: return index_of<std::optional<mystruct>>();
case command::lrange: return index_of<std::list<mystruct>>();
case command::smembers: return index_of<std::set<mystruct>>();
case command::hgetall: return index_of<std::map<std::string, mystruct>>();
case command::exec: return index_of<transaction_type>();
default: return -1;
}
}
public:
receiver(client_type& db) : db_{&db} {}
void on_read(command cmd) override
void on_read_impl(command cmd) override
{
std::cout << cmd << "\n";
switch (cmd) {
case command::hello:
{
@@ -92,7 +103,7 @@ public:
{{1, 2}, {3, 4}, {5, 6}};
// Sends
db_->send(command::set, "serialization-var-key", var);
db_->send(command::set, "serialization-var-key", var, "EX", "2");
db_->send_range(command::hset, "serialization-hset-key", std::cbegin(map), std::cend(map));
db_->send_range(command::rpush, "serialization-rpush-key", std::cbegin(vec), std::cend(vec));
db_->send_range(command::sadd, "serialization-sadd-key", std::cbegin(set), std::cend(set));
@@ -103,24 +114,34 @@ public:
db_->send(command::lrange, "serialization-rpush-key", 0, -1);
db_->send(command::smembers, "serialization-sadd-key");
// quits
db_->send(command::quit);
// Transaction
db_->send(command::multi);
db_->send(command::get, "serialization-var-key");
db_->send(command::lrange, "serialization-rpush-key", 0, -1);
db_->send(command::hgetall, "serialization-hset-key");
db_->send(command::exec);
} break;
case command::get:
std::cout << get<mystruct>() << std::endl;
break;
{
if (get<std::optional<mystruct>>().has_value()) {
std::cout << get<std::optional<mystruct>>().value() << "\n\n";
get<std::optional<mystruct>>().reset();
} else {
std::cout << "Expired." << "\n";
}
} break;
case command::lrange:
for (auto const& e: get<std::list<mystruct>>())
std::cout << e << " ";
std::cout << e << "\n";
std::cout << "\n";
get<std::list<mystruct>>().clear();
break;
case command::smembers:
for (auto const& e: get<std::set<mystruct>>())
std::cout << e << " ";
std::cout << e << "\n";
std::cout << "\n";
get<std::set<mystruct>>().clear();
break;
@@ -132,6 +153,19 @@ public:
get<std::map<std::string, mystruct>>().clear();
break;
case command::exec:
{
std::cout
<< "First element: \n"
<< std::get<mystruct>(get<transaction_type>()) << "\n";
std::cout << "Second element: \n";
for (auto const& e: std::get<std::vector<mystruct>>(get<transaction_type>()))
std::cout << e << "\n";
std::cout << "\n";
std::get<std::vector<mystruct>>(get<transaction_type>()).clear();
} break;
default:;
}
}
@@ -141,12 +175,19 @@ int main()
{
net::io_context ioc;
client_type db(ioc.get_executor());
receiver recv{db};
myreceiver recv{db};
db.async_run(
recv,
{net::ip::make_address("127.0.0.1"), 6379},
[](auto ec){ std::cout << ec.message() << std::endl;});
net::steady_timer tm{ioc, std::chrono::seconds{3}};
tm.async_wait([&db](auto ec){
db.send(command::get, "serialization-var-key");
db.send(command::quit);
});
ioc.run();
}

View File

@@ -7,85 +7,86 @@
#include <map>
#include <vector>
#include <memory>
#include <iostream>
#include <unordered_map>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace net = aedis::net;
namespace redis = aedis::redis;
using aedis::redis::command;
using aedis::redis::receiver_tuple;
using aedis::redis::receiver;
using aedis::resp3::node;
using client_type = redis::client<net::detached_t::as_default_on_t<aedis::net::ip::tcp::socket>>;
void send_containers(std::shared_ptr<client_type> db)
// Helper function.
template <class Container>
void print_and_clear(Container& cont)
{
std::map<std::string, std::string> map
{ {"key1", "value1"}
, {"key2", "value2"}
, {"key3", "value3"}
};
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
std::set<std::string> set
{"one", "two", "three", "four"};
// Sends the stl containers.
db->send_range(command::hset, "hset-key", std::cbegin(map), std::cend(map));
db->send_range(command::rpush, "rpush-key", std::cbegin(vec), std::cend(vec));
db->send_range(command::sadd, "sadd-key", std::cbegin(set), std::cend(set));
// Retrieves the containers.
db->send(command::hgetall, "hset-key");
db->send(command::lrange, "rpush-key", 0, -1);
db->send(command::smembers, "sadd-key");
std::cout << "\n";
for (auto const& e: cont) std::cout << e << " ";
std::cout << "\n";
cont.clear();
}
// Grouping of all expected responses in a tuple.
using receiver_tuple_type =
receiver_tuple<
using receiver_type =
receiver<
std::list<int>,
std::set<std::string>,
std::vector<node<std::string>>
>;
struct receiver : receiver_tuple_type {
struct myreceiver : receiver_type {
public:
myreceiver(client_type& db) : db_{&db} {}
private:
std::shared_ptr<client_type> db_;
int to_tuple_index(command cmd) override
{
switch (cmd) {
case command::lrange: return index_of<std::list<int>>();
case command::smembers: return index_of<std::set<std::string>>();
default: return -1;
}
}
int to_tuple_idx_impl(command cmd) override
{
switch (cmd) {
case command::lrange: return index_of<std::list<int>>();
case command::smembers: return index_of<std::set<std::string>>();
default: return -1;
}
}
public:
receiver(std::shared_ptr<client_type> db) : db_{db} {}
void on_read(command cmd) override
void on_read_impl(command cmd) override
{
switch (cmd) {
case command::hello:
send_containers(db_);
db_->send(command::quit);
break;
{
std::map<std::string, std::string> map
{ {"key1", "value1"}
, {"key2", "value2"}
, {"key3", "value3"}
};
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
std::set<std::string> set
{"one", "two", "three", "four"};
// Sends the stl containers.
db_->send_range(command::hset, "hset-key", std::cbegin(map), std::cend(map));
db_->send_range(command::rpush, "rpush-key", std::cbegin(vec), std::cend(vec));
db_->send_range(command::sadd, "sadd-key", std::cbegin(set), std::cend(set));
//_ Retrieves the containers.
db_->send(command::hgetall, "hset-key");
db_->send(command::lrange, "rpush-key", 0, -1);
db_->send(command::smembers, "sadd-key");
db_->send(command::quit);
} break;
case command::lrange:
aedis::print_and_clear(get<std::list<int>>());
print_and_clear(get<std::list<int>>());
break;
case command::smembers:
aedis::print_and_clear(get<std::set<std::string>>());
print_and_clear(get<std::set<std::string>>());
break;
default:;
@@ -96,8 +97,14 @@ public:
int main()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
receiver recv{db};
db->async_run(recv);
client_type db{ioc.get_executor()};
myreceiver recv{db};
db.async_run(
recv,
{net::ip::make_address("127.0.0.1"), 6379},
[](auto ec){ std::cout << ec.message() << std::endl;});
ioc.run();
}

View File

@@ -10,11 +10,9 @@
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace net = aedis::net;
using aedis::redis::command;
using aedis::redis::receiver_tuple;
using aedis::redis::receiver;
using aedis::resp3::node;
using client_type = aedis::redis::client<aedis::net::ip::tcp::socket>;
using response_type = std::vector<node<std::string>>;
@@ -34,14 +32,14 @@ using response_type = std::vector<node<std::string>>;
* example.
*/
class receiver : public receiver_tuple<response_type> {
class myreceiver : public receiver<response_type> {
public:
myreceiver(client_type& db) : db_{&db} {}
private:
client_type* db_;
public:
receiver(client_type& db) : db_{&db} {}
void on_read(command cmd) override
void on_read_impl(command cmd) override
{
switch (cmd) {
case command::hello:
@@ -65,7 +63,7 @@ int main()
{
net::io_context ioc;
client_type db{ioc.get_executor()};
receiver recv{db};
myreceiver recv{db};
db.async_run(
recv,

View File

@@ -1,88 +0,0 @@
/* Copyright (c) 2019 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <iostream>
#include <vector>
#include <tuple>
#include <array>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "lib/net_utils.hpp"
namespace resp3 = aedis::resp3;
using aedis::redis::command;
using aedis::redis::make_serializer;
using resp3::adapt;
namespace net = aedis::net;
using net::async_write;
using net::buffer;
using net::dynamic_buffer;
// From lib/net_utils.hpp
using aedis::connect;
net::awaitable<void> transaction()
{
try {
auto socket = co_await connect();
auto list = {"one", "two", "three"};
std::string request;
auto sr = make_serializer(request);
sr.push(command::hello, 3);
sr.push(command::flushall);
sr.push(command::multi); // Starts a transaction
sr.push(command::ping, "Some message");
sr.push(command::incr, "incr1-key");
sr.push_range(command::rpush, "list-key", std::cbegin(list), std::cend(list));
sr.push(command::lrange, "list-key", 0, -1);
sr.push(command::incr, "incr2-key");
sr.push(command::exec); // Ends the transaction.
sr.push(command::quit);
co_await async_write(socket, buffer(request));
// Expected responses.
std::tuple<std::string, int, int, std::vector<std::string>, int> execs;
// Reads the response.
std::string buffer;
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // hello
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // flushall
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // multi
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // ping
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // incr
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // rpush
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // lrange
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // incr
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(execs));
co_await resp3::async_read(socket, dynamic_buffer(buffer)); // quit
// Prints the response to the transaction.
std::cout << "ping: " << std::get<0>(execs) << "\n";
std::cout << "incr1: " << std::get<1>(execs) << "\n";
std::cout << "rpush: " << std::get<2>(execs) << "\n";
std::cout << "lrange: ";
for (auto const& e: std::get<3>(execs)) std::cout << e << " ";
std::cout << "\n";
std::cout << "incr2: " << std::get<4>(execs) << "\n";
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
exit(EXIT_FAILURE);
}
}
int main()
{
net::io_context ioc;
co_spawn(ioc, transaction(), net::detached);
ioc.run();
}