2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Adds support for tuple in the high level api.

This commit is contained in:
Marcelo Zimbres
2022-05-29 10:52:39 +02:00
parent ebef2f9e23
commit 74e0a6ca23
13 changed files with 193 additions and 81 deletions

View File

@@ -62,6 +62,7 @@ nobase_include_HEADERS =\
$(top_srcdir)/aedis/generic/connection.hpp\
$(top_srcdir)/aedis/generic/request.hpp\
$(top_srcdir)/aedis/generic/error.hpp\
$(top_srcdir)/aedis/generic/adapt.hpp\
$(top_srcdir)/aedis/generic/impl/error.ipp\
$(top_srcdir)/aedis/generic/detail/connection_ops.hpp\
$(top_srcdir)/aedis/sentinel/command.hpp\

View File

@@ -28,8 +28,7 @@ namespace detail {
* containers and C++ buil-in types.
*/
template <class ResponseType>
struct response_traits
{
struct response_traits {
using adapter_type = adapter::detail::wrapper<ResponseType>;
static auto adapt(ResponseType& r) noexcept { return adapter_type{&r}; }
};
@@ -38,31 +37,26 @@ template <class T>
using adapter_t = typename response_traits<T>::adapter_type;
template <class T>
struct response_traits<resp3::node<T>>
{
struct response_traits<resp3::node<T>> {
using response_type = resp3::node<T>;
using adapter_type = adapter::detail::general_simple<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <class String, class Allocator>
struct response_traits<std::vector<resp3::node<String>, Allocator>>
{
struct response_traits<std::vector<resp3::node<String>, Allocator>> {
using response_type = std::vector<resp3::node<String>, Allocator>;
using adapter_type = adapter::detail::general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
template <>
struct response_traits<void>
{
struct response_traits<void> {
using response_type = void;
using adapter_type = resp3::detail::ignore_response;
static auto adapt() noexcept { return adapter_type{}; }
};
namespace detail {
// Duplicated here to avoid circular include dependency.
template<class T>
auto internal_adapt(T& t) noexcept
@@ -144,13 +138,11 @@ public:
}
};
} // detail
template <class... Ts>
struct response_traits<std::tuple<Ts...>>
{
using response_type = std::tuple<Ts...>;
using adapter_type = detail::static_aggregate_adapter<response_type>;
using adapter_type = static_aggregate_adapter<response_type>;
static auto adapt(response_type& r) noexcept { return adapter_type{&r}; }
};

View File

@@ -16,6 +16,7 @@
#include <aedis/generic/error.hpp>
#include <aedis/generic/request.hpp>
#include <aedis/generic/connection.hpp>
#include <aedis/generic/adapt.hpp>
/** \mainpage Documentation
\tableofcontents

134
aedis/generic/adapt.hpp Normal file
View File

@@ -0,0 +1,134 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_GENERIC_ADAPT_HPP
#define AEDIS_GENERIC_ADAPT_HPP
#include <tuple>
#include <boost/mp11.hpp>
#include <boost/variant2.hpp>
#include <boost/utility/string_view.hpp>
#include <boost/system.hpp>
#include <aedis/resp3/node.hpp>
#include <aedis/adapter/adapt.hpp>
#include <aedis/adapter/detail/response_traits.hpp>
namespace aedis {
namespace generic {
namespace detail {
struct ignore_adapter {
template <class Command>
void
operator()(
std::size_t i,
Command cmd,
resp3::node<boost::string_view> const& nd,
boost::system::error_code& ec)
{
}
};
template <class Tuple>
class static_adapter {
private:
static constexpr auto size = std::tuple_size<Tuple>::value;
using adapter_tuple = boost::mp11::mp_transform<adapter::adapter_t, Tuple>;
using variant_type = boost::mp11::mp_rename<adapter_tuple, boost::variant2::variant>;
using adapters_array_type = std::array<variant_type, size>;
adapters_array_type adapters_;
public:
static_adapter(Tuple& r = nullptr)
{
adapter::detail::assigner<std::tuple_size<Tuple>::value - 1>::assign(adapters_, r);
}
template <class Command>
void
operator()(
std::size_t i,
Command cmd,
resp3::node<boost::string_view> const& nd,
boost::system::error_code& ec)
{
using boost::variant2::visit;
BOOST_ASSERT(i < adapters_.size());
visit([&](auto& arg){arg(nd, ec);}, adapters_.at(i));
}
};
template <class Vector>
class vector_adapter {
private:
using adapter_type = adapter::detail::response_traits<Vector>::adapter_type;
adapter_type adapter_;
public:
vector_adapter(Vector& v) : adapter_{adapter::adapt(v)} { }
template <class Command>
void
operator()(
std::size_t i,
Command cmd,
resp3::node<boost::string_view> const& nd,
boost::system::error_code& ec)
{
adapter_(nd, ec);
}
};
template <class>
struct response_traits;
template <>
struct response_traits<void> {
using response_type = void;
using adapter_type = detail::ignore_adapter;
static auto adapt() noexcept
{ return detail::ignore_adapter{}; }
};
template <class String, class Allocator>
struct response_traits<std::vector<resp3::node<String>, Allocator>> {
using response_type = std::vector<resp3::node<String>, Allocator>;
using adapter_type = vector_adapter<response_type>;
static auto adapt(response_type& v) noexcept
{ return adapter_type{v}; }
};
template <class ...Ts>
struct response_traits<std::tuple<Ts...>> {
using response_type = std::tuple<Ts...>;
using adapter_type = static_adapter<response_type>;
static auto adapt(response_type& r) noexcept
{ return adapter_type{r}; }
};
} // detail
auto adapt() noexcept
{
return detail::response_traits<void>::adapt();
}
template<class T>
auto adapt(T& t) noexcept
{
return detail::response_traits<T>::adapt(t);
}
} // generic
} // aedis
#endif // AEDIS_GENERIC_ADAPT_HPP

View File

@@ -25,12 +25,10 @@
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/node.hpp>
#include <aedis/redis/command.hpp>
#include <aedis/adapter/adapt.hpp>
#include <aedis/generic/adapt.hpp>
#include <aedis/generic/request.hpp>
#include <aedis/generic/detail/connection_ops.hpp>
// TODO: Don't pass pong to the adapter.
namespace aedis {
namespace generic {
@@ -45,10 +43,6 @@ namespace generic {
*/
template <class Command, class AsyncReadWriteStream = boost::asio::ip::tcp::socket>
class connection {
private:
using adapter_type = std::function<void(Command, resp3::node<boost::string_view> const&, boost::system::error_code&)>;
using adapter_type2 = std::function<void(resp3::node<boost::string_view> const&, boost::system::error_code&)>;
public:
/// Executor type.
using executor_type = typename AsyncReadWriteStream::executor_type;
@@ -193,12 +187,11 @@ public:
/** @brief Asynchrnously schedules a command for execution.
*/
template <
class Adapter = adapter::detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type,
std::enable_if_t<std::is_convertible<Adapter, adapter_type>::value, bool> = true>
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto async_exec(
request_type const& req,
Adapter adapter = adapter::adapt(),
Adapter adapter = generic::adapt(),
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
@@ -207,28 +200,10 @@ public:
>(detail::exec_op<connection, Adapter>{this, &req, adapter}, token, resv_);
}
template <
class Adapter = adapter::detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type,
std::enable_if_t<std::is_convertible<Adapter, adapter_type2>::value, bool> = true>
auto async_exec(
request_type const& req,
Adapter adapter = adapter::adapt(),
CompletionToken token = CompletionToken{})
{
auto wrap = [adapter]
(Command, resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable
{ adapter(nd, ec); };
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<connection, decltype(wrap)>{this, &req, wrap}, token, resv_);
}
/** @brief Receives events produced by the run operation.
*/
template <
class Adapter = adapter::detail::response_traits<void>::adapter_type,
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto
async_read_push(
@@ -238,7 +213,7 @@ public:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::read_push_op<connection, Command, Adapter>{this, adapter}, token, resv_);
>(detail::read_push_op<connection, Adapter>{this, adapter}, token, resv_);
}
/** @brief Closes the connection with the database.
@@ -263,11 +238,11 @@ public:
private:
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
template <class T, class U, class V> friend struct detail::read_push_op;
template <class T, class U> friend struct detail::reader_op;
template <class T, class U> friend struct detail::ping_op;
template <class T, class U> friend struct detail::run_op;
template <class T, class u> friend struct detail::exec_op;
template <class T, class U> friend struct detail::exec_op;
template <class T, class U> friend struct detail::read_push_op;
template <class T> friend struct detail::exec_internal_impl_op;
template <class T> friend struct detail::exec_internal_op;
template <class T> friend struct detail::write_op;

View File

@@ -22,7 +22,7 @@
#include <aedis/resp3/write.hpp>
#include <aedis/generic/error.hpp>
#include <aedis/redis/command.hpp>
#include <aedis/adapter/adapt.hpp>
#include <aedis/generic/adapt.hpp>
namespace aedis {
namespace generic {
@@ -116,7 +116,7 @@ struct exec_internal_op {
}
};
template <class Conn, class Command, class Adapter>
template <class Conn, class Adapter>
struct read_push_op {
Conn* cli;
Adapter adapter;
@@ -167,6 +167,7 @@ struct exec_op {
Adapter adapter;
std::shared_ptr<boost::asio::steady_timer> timer;
std::size_t read_size = 0;
std::size_t index = 0;
boost::asio::coroutine coro;
template <class Self>
@@ -198,12 +199,13 @@ struct exec_op {
cli->reqs_.pop_front(); // TODO: Recycle timers.
// If there is no ongoing push-read operation we can
// request the timer to proceed, otherwise we can just
// request the reader to proceed, otherwise we can just
// exit.
if (cli->waiting_pushes_ == 0) {
//std::cout << "exec_op: requesting read op to proceed." << std::endl;
cli->wait_read_timer_.cancel_one();
}
self.complete({}, 0);
return;
}
@@ -211,12 +213,16 @@ struct exec_op {
// Notice we use the front of the queue.
while (cli->reqs_.front().n_cmds != 0) {
BOOST_ASSERT(!cli->cmds_.empty());
yield
resp3::async_read(
*cli->socket_,
cli->make_dynamic_buffer(),
[adpt = adapter, cmd = cli->cmds_.front()] (resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable { adpt(cmd, nd, ec); },
[i = index, adpt = adapter, cmd = cli->cmds_.front()] (resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable { adpt(i, cmd, nd, ec); },
std::move(self));
++index;
if (ec) {
cli->close();
self.complete(ec, 0);
@@ -285,7 +291,7 @@ struct ping_op {
//std::cout << "ping_op: Sending a command." << std::endl;
cli->req_.clear();
cli->req_.push(Command::ping);
yield cli->async_exec(cli->req_, adapter::adapt(), std::move(self));
yield cli->async_exec(cli->req_, generic::adapt(), std::move(self));
if (ec) {
self.complete(ec);
return;

View File

@@ -44,7 +44,7 @@ PROJECT_NUMBER = "@PACKAGE_VERSION@"
# for a project that appears at the top of each page and should give viewer a
# quick idea about the purpose of the project. Keep the description short.
PROJECT_BRIEF = "Low level Redis client library"
PROJECT_BRIEF = "High level Redis client library"
# With the PROJECT_LOGO tag one can specify a logo or an icon that is included
# in the documentation. The maximum height of the logo should not exceed 55

View File

@@ -14,8 +14,9 @@
#include <aedis/src.hpp>
namespace net = boost::asio;
namespace generic = aedis::generic;
namespace adapter = aedis::adapter;
using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
@@ -60,7 +61,7 @@ private:
request<command> req;
req.push(command::publish, "channel", msg);
req.push(command::incr, "chat-room-counter");
co_await db->async_exec(req, adapt(*resp), net::use_awaitable);
co_await db->async_exec(req, generic::adapt(*resp), net::use_awaitable);
std::cout << "Messsages so far: " << resp->at(1).value << std::endl;
resp->clear();
msg.erase(0, n);
@@ -107,7 +108,7 @@ reader(
std::shared_ptr<sessions_type> sessions)
{
for (;;) {
co_await db->async_read_push(adapt(*resp), net::use_awaitable);
co_await db->async_read_push(adapter::adapt(*resp), net::use_awaitable);
for (auto& session: *sessions)
session->deliver(resp->at(3).value);
@@ -147,7 +148,7 @@ int main()
// response.
request<command> req;
req.push(command::subscribe, "channel");
db->async_exec(req, adapt(), handler);
db->async_exec(req, generic::adapt(), handler);
auto resp = std::make_shared<response_type>();

View File

@@ -14,8 +14,8 @@
#include <aedis/src.hpp>
namespace net = boost::asio;
namespace generic = aedis::generic;
using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::generic::request;
using aedis::redis::command;
using response_type = std::vector<aedis::resp3::node<std::string>>;
@@ -37,7 +37,7 @@ echo_loop(
req.push(command::incr, "echo-server-counter");
req.push(command::set, "echo-server-key", msg);
req.push(command::get, "echo-server-key");
co_await db->async_exec(req, adapt(*resp), net::use_awaitable);
co_await db->async_exec(req, generic::adapt(*resp), net::use_awaitable);
resp->at(0).value += ": ";
resp->at(0).value += resp->at(2).value;
co_await net::async_write(socket, net::buffer(resp->at(0).value));

View File

@@ -14,8 +14,9 @@
#include <aedis/src.hpp>
namespace net = boost::asio;
namespace generic = aedis::generic;
using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
@@ -29,13 +30,14 @@ int main()
req.push(command::ping, "Ping example");
req.push(command::quit);
std::string resp;
std::tuple<std::string, std::string> resp;
net::io_context ioc;
connection db{ioc};
db.async_exec(req, adapt(resp), handler);
db.async_exec(req, generic::adapt(resp), handler);
db.async_run(handler);
ioc.run();
std::cout << resp << std::endl;
std::cout << std::get<0>(resp) << std::endl;
std::cout << std::get<1>(resp) << std::endl;
}

View File

@@ -16,8 +16,7 @@
#include "print.hpp"
namespace net = boost::asio;
using aedis::adapter::adapt;
using aedis::adapter::adapter_t;
namespace adapter = aedis::adapter;
using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
@@ -39,10 +38,10 @@ int main()
T2 resp2;
auto adapter =
[ a0 = adapt(resp0)
, a1 = adapt(resp1)
, a2 = adapt(resp2)
](command cmd, node_type const& nd, error_code& ec) mutable
[ a0 = adapter::adapt(resp0)
, a1 = adapter::adapt(resp1)
, a2 = adapter::adapt(resp2)
](std::size_t, command cmd, node_type const& nd, error_code& ec) mutable
{
switch (cmd) {
case command::lrange: a0(nd, ec); break;

View File

@@ -12,11 +12,10 @@
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapter::adapt;
namespace generic = aedis::generic;
using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
using response_type = std::vector<aedis::resp3::node<std::string>>;
/* In this example we send a subscription to a channel and start
* reading server side messages indefinitely.
@@ -35,9 +34,10 @@ using response_type = std::vector<aedis::resp3::node<std::string>>;
net::awaitable<void> reader(std::shared_ptr<connection> db)
{
response_type resp;
std::vector<aedis::resp3::node<std::string>> resp;
for (;;) {
auto n = co_await db->async_read_push(adapt(resp), net::use_awaitable);
auto n = co_await db->async_read_push(aedis::adapter::adapt(resp), net::use_awaitable);
std::cout
<< "Size: " << n << "\n"
<< "Event: " << resp.at(1).value << "\n"
@@ -53,7 +53,7 @@ net::awaitable<void> subscriber(std::shared_ptr<connection> db)
{
request<command> req;
req.push(command::subscribe, "channel1", "channel2");
co_await db->async_exec(req, adapt(), net::use_awaitable);
co_await db->async_exec(req, generic::adapt(), net::use_awaitable);
}
auto handler = [](auto ec, auto...)

View File

@@ -20,8 +20,9 @@
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
namespace generic = aedis::generic;
namespace adapter = aedis::adapter;
using aedis::adapter::adapt;
using aedis::adapter::adapter_t;
using aedis::redis::command;
using aedis::resp3::node;
@@ -82,7 +83,7 @@ void test_quit()
request<command> req;
req.push(command::quit);
db->async_exec(req, adapt(), [](auto ec, auto r){
db->async_exec(req, generic::adapt(), [](auto ec, auto r){
expect_no_error(ec);
//expect_eq(w, 36UL);
//expect_eq(r, 152UL);
@@ -101,12 +102,12 @@ net::awaitable<void>
push_consumer3(std::shared_ptr<connection> db)
{
{
auto [ec, n] = co_await db->async_read_push(adapt(), as_tuple(net::use_awaitable));
auto [ec, n] = co_await db->async_read_push(adapter::adapt(), as_tuple(net::use_awaitable));
expect_no_error(ec);
}
{
auto [ec, n] = co_await db->async_read_push(adapt(), as_tuple(net::use_awaitable));
auto [ec, n] = co_await db->async_read_push(adapter::adapt(), as_tuple(net::use_awaitable));
expect_error(ec, boost::asio::error::operation_aborted);
}
}
@@ -120,7 +121,7 @@ void test_push()
req.push(command::subscribe, "channel");
req.push(command::quit);
db->async_exec(req, adapt(), [](auto ec, auto r){
db->async_exec(req, generic::adapt(), [](auto ec, auto r){
expect_no_error(ec);
//expect_eq(w, 68UL);
//expect_eq(r, 151UL);
@@ -142,7 +143,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
{
request<command> req;
req.push(command::quit);
db->async_exec(req, adapt(), [](auto ec, auto){
db->async_exec(req, generic::adapt(), [](auto ec, auto){
expect_no_error(ec);
});
@@ -153,7 +154,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
{
request<command> req;
req.push(command::quit);
db->async_exec(req, adapt(), [](auto ec, auto){
db->async_exec(req, generic::adapt(), [](auto ec, auto){
expect_no_error(ec);
});
@@ -187,7 +188,7 @@ void test_idle()
request<command> req;
req.push(command::client, "PAUSE", 5000);
db->async_exec(req, adapt(), [](auto ec, auto r){
db->async_exec(req, generic::adapt(), [](auto ec, auto r){
expect_no_error(ec);
});