2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Commit of the following:

- Adds sync class the offer a thread-safe and synchronous API.
- Fixes documentation of adapt functions.
- Removes compose.hpp header.
- Adds test to aedis::error and resp3::type.
- Simplifies some code.
This commit is contained in:
Marcelo Zimbres
2022-08-20 09:29:48 +02:00
parent a31d797e43
commit 225095944c
17 changed files with 539 additions and 441 deletions

View File

@@ -6,7 +6,8 @@ codecov:
wait_for_ci: yes
ignore:
- "**/benchmarks/*"
- "**/examples/*"
- "benchmarks/cpp/asio/*"
- "examples/*"
- "tests/*"
- "/usr/*"
- "**/boost/*"

View File

@@ -2,23 +2,27 @@
## master
* Makes all free functions from the `sync.hpp` member functions of the
`connection` class.
* Removes collision between `aedis::adapter::adapt` and
`aedis::adapt`.
* Fixes a bug on reconnect from a state where the `connection` object
had unsent commands. This would cause `async_exec` to never
complete.
* Adds new class `sync` that wraps a `connection` and offers a
thread-safe synchronous API. All free functions from the `sync.hpp`
are now member functions of the `sync` class.
* Split `connection::async_receive_event` in two functions, one to
receive events and other for server side pushes.
* Removes collision between `aedis::adapter::adapt` and
`aedis::adapt`.
* Adds `connection::operation` enum to replace `cancel_*` member
functions with a single cancel function that gets what should be
cancelled as argument.
* Bugfix: Documentation of `adapt()` functions were missing from
doxygen.
* Bugfix: a bug on reconnect from a state where the `connection` object
had unsent commands. It could cause `async_exec` to never
complete under certain conditions.
## v0.3.0
* Adds `experimental::exec` and `receive_event` functions to offer a

View File

@@ -6,7 +6,7 @@
#include <tuple>
#include <string>
#include <boost/asio.hpp>
#include <boost/asio/io_context.hpp>
#include <aedis.hpp>
// Include this in no more than one .cpp file.
@@ -15,18 +15,17 @@
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using connection = aedis::connection<>;
using connection = aedis::sync<aedis::connection<>>;
int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
auto work = net::make_work_guard(ioc);
std::thread t1{[&]() { ioc.run(); }};
std::thread thread{[&]() {
conn.async_run(net::detached);
ioc.run();
}};
connection conn{work.get_executor()};
std::thread t2{[&]() { boost::system::error_code ec; conn.run(ec); }};
request req;
req.push("PING");
@@ -34,9 +33,12 @@ int main()
std::tuple<std::string, aedis::ignore> resp;
conn.exec(req, adapt(resp));
thread.join();
std::cout << "Response: " << std::get<0>(resp) << std::endl;
work.reset();
t1.join();
t2.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}

View File

@@ -14,23 +14,15 @@
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::resp3::request;
using connection = aedis::connection<>;
using aedis::resp3::node;
using aedis::adapt;
using aedis::resp3::node;
using aedis::resp3::request;
using connection = aedis::sync<aedis::connection<>>;
using event = connection::event;
// See subscriber.cpp for more info about how to run this example.
void push_receiver(connection& conn)
{
for (std::vector<node<std::string>> resp;;) {
conn.receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
}
// Subscribe again everytime there is a disconnection.
void event_receiver(connection& conn)
{
request req;
@@ -47,19 +39,26 @@ int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
auto work = net::make_work_guard(ioc);
conn.get_config().enable_events = true;
conn.get_config().enable_reconnect = true;
connection::config cfg;
cfg.enable_events = true;
cfg.enable_reconnect = true;
connection conn{work.get_executor(), cfg};
std::thread push_thread{[&]() { push_receiver(conn); }};
std::thread event_thread{[&]() { event_receiver(conn); }};
std::thread t1{[&]() { ioc.run(); }};
std::thread t2{[&]() { boost::system::error_code ec; conn.run(ec); }};
std::thread t3{[&]() { event_receiver(conn); }};
conn.async_run(net::detached);
ioc.run();
for (std::vector<node<std::string>> resp;;) {
conn.receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
event_thread.join();
push_thread.join();
t1.join();
t2.join();
t3.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -12,7 +12,6 @@ nobase_include_HEADERS =\
$(top_srcdir)/include/aedis/adapter/adapt.hpp\
$(top_srcdir)/include/aedis/adapter/detail/response_traits.hpp\
$(top_srcdir)/include/aedis/resp3/node.hpp\
$(top_srcdir)/include/aedis/resp3/compose.hpp\
$(top_srcdir)/include/aedis/resp3/detail/read_ops.hpp\
$(top_srcdir)/include/aedis/resp3/detail/parser.hpp\
$(top_srcdir)/include/aedis/resp3/detail/exec.hpp\

View File

@@ -10,6 +10,7 @@
#include <aedis/error.hpp>
#include <aedis/adapt.hpp>
#include <aedis/connection.hpp>
#include <aedis/sync.hpp>
#include <aedis/resp3/request.hpp>
/** \mainpage Documentation

View File

@@ -20,12 +20,13 @@
namespace aedis {
/** @brief A type that ignores responses.
/** @brief Tag used tp ignore responses.
* @ingroup any
*
* For example
*
* @code
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
* std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
* @endcode
*
* will cause only the second tuple type to be parsed, the others
@@ -131,9 +132,10 @@ struct response_traits<std::tuple<Ts...>> {
} // detail
/** @brief Creates an adapter that ignores responses.
* @ingroup any
*
* This function can be used to create adapters that ignores
* responses. As a result it can improve performance.
* responses.
*/
auto adapt() noexcept
{
@@ -141,6 +143,7 @@ auto adapt() noexcept
}
/** @brief Adapts a type to be used as a response.
* @ingroup any
*
* The type T can be any STL container, any integer type and
* \c std::string

View File

@@ -13,7 +13,6 @@
#include <chrono>
#include <memory>
#include <type_traits>
#include <condition_variable>
#include <boost/assert.hpp>
#include <boost/asio/ip/tcp.hpp>
@@ -27,7 +26,7 @@
namespace aedis {
/** @brief A high level Redis connection class.
/** @brief A high level Redis asynchronous connection to Redis.
* @ingroup any
*
* This class keeps a healthy connection to the Redis instance where
@@ -99,8 +98,6 @@ public:
connect,
/// Success sending AUTH and HELLO.
hello,
/// Successful writing on a socket.
write,
/// Used internally.
invalid
};
@@ -394,174 +391,7 @@ public:
}
/// @}
/** @name Synchronous functions
**/
/// @{
/** @brief Executes a request.
*
* @remark This function will block until execution completes. It
* assumes the connection is running on a different thread.
*
* @param req The request.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter>
std::size_t
exec(resp3::request const& req, ResponseAdapter adapter, boost::system::error_code& ec)
{
sync sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, &req, adapter]()
{
async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Executes a command.
*
* @remark This function will block until execution completes. It
* assumes the connection is running on a different thread.
*
* @param req The request.
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter = detail::response_traits<void>::adapter_type>
std::size_t exec(resp3::request const& req, ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = exec(req, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives server pushes synchronusly.
*
* @remark This function will block until execution completes. It
* assumes the connection is running on a different thread.
*
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter>
auto receive_push(ResponseAdapter adapter, boost::system::error_code& ec)
{
sync sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, adapter]()
{
async_receive_push(adapter, [&ec, &res, &sh](auto const& e, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = e;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Receives server pushes synchronusly.
*
* @remark This function will block until execution completes. It
* assumes the connection is running on a different thread.
*
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
auto receive_push(ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = receive_push(adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives events
*
* @remark This function will block until execution completes. It
* assumes the connection is running on a different thread.
*
* @param ec Error code in case of error.
* @returns The event received.
*/
auto receive_event(boost::system::error_code& ec)
{
sync sh;
auto res = event::invalid;
auto f = [this, &ec, &res, &sh]()
{
async_receive_event([&ec, &res, &sh](auto const& ecp, event ev) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = ev;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Receives events
*
* @remark This function will block until execution completes. It
* assumes the connection is running on a different thread.
*
* @throws std::system_error in case of error.
* @returns The event received.
*/
auto receive_event()
{
boost::system::error_code ec;
auto const res = receive_event(ec);
if (ec)
throw std::system_error(ec);
return res;
}
/// @}
private:
struct sync {
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
};
struct req_info {
req_info(executor_type ex) : timer{ex} {}
timer_type timer;

View File

@@ -56,26 +56,18 @@ struct connect_op {
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, ep);
return;
}
} break;
case 0: self.complete(ec1, ep); break;
case 1:
{
if (!ec2) {
if (ec2)
self.complete({}, ep);
else
self.complete(error::connect_timeout, ep);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, ep);
}
}
};

View File

@@ -25,6 +25,9 @@ enum class error
/// Idle timeout.
idle_timeout,
/// Exec timeout.
exec_timeout,
/// Invalid RESP3 type.
invalid_data_type,

View File

@@ -22,6 +22,7 @@ struct error_category_impl : boost::system::error_category {
case error::resolve_timeout: return "Resolve operation timeout.";
case error::connect_timeout: return "Connect operation timeout.";
case error::idle_timeout: return "Idle timeout.";
case error::exec_timeout: return "Exec timeout.";
case error::invalid_data_type: return "Invalid resp3 type.";
case error::not_a_number: return "Can't convert string to number.";
case error::unexpected_read_size: return "Unexpected read size.";

View File

@@ -1,164 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_RESP3_COMPOSE_HPP
#define AEDIS_RESP3_COMPOSE_HPP
#include <string>
#include <tuple>
#include <boost/hana.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/resp3/type.hpp>
namespace aedis {
namespace resp3 {
constexpr char separator[] = "\r\n";
/** @brief Adds a bulk to the request.
* @ingroup any
*
* This function is useful in serialization of your own data
* structures in a request. For example
*
* @code
* void to_bulk(std::string& to, mystruct const& obj)
* {
* auto const str = // Convert obj to a string.
* resp3::to_bulk(to, str);
* }
* @endcode
*
* @param to Storage on which data will be copied into.
* @param data Data that will be serialized and stored in @c to.
*
* See more in \ref requests-serialization.
*/
template <class Request>
void to_bulk(Request& to, boost::string_view data)
{
auto const str = std::to_string(data.size());
to += to_code(type::blob_string);
to.append(std::cbegin(str), std::cend(str));
to += separator;
to.append(std::cbegin(data), std::cend(data));
to += separator;
}
template <class Request, class T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
void to_bulk(Request& to, T n)
{
auto const s = std::to_string(n);
to_bulk(to, boost::string_view{s});
}
namespace detail {
template <class T>
struct add_bulk_impl {
template <class Request>
static void add(Request& to, T const& from)
{
using namespace aedis::resp3;
to_bulk(to, from);
}
};
template <class U, class V>
struct add_bulk_impl<std::pair<U, V>> {
template <class Request>
static void add(Request& to, std::pair<U, V> const& from)
{
using namespace aedis::resp3;
to_bulk(to, from.first);
to_bulk(to, from.second);
}
};
template <class ...Ts>
struct add_bulk_impl<boost::hana::tuple<Ts...>> {
template <class Request>
static void add(Request& to, boost::hana::tuple<Ts...> const& from)
{
using boost::hana::for_each;
// Fold expressions is C++17 so we use hana.
//(resp3::add_bulk(*request_, args), ...);
for_each(from, [&](auto const& e) {
using namespace aedis::resp3;
to_bulk(to, e);
});
}
};
} // detail
/** \internal
* \brief Adds a resp3 header to the request.
* \ingroup any
*
* See mystruct.hpp for an example.
*/
template <class Request>
void add_header(Request& to, type t, std::size_t size)
{
auto const str = std::to_string(size);
to += to_code(t);
to.append(std::cbegin(str), std::cend(str));
to += separator;
}
/* Adds a rep3 bulk to the request.
*
* This function adds \c data as a bulk string to the request \c to.
*/
template <class Request, class T>
void add_bulk(Request& to, T const& data)
{
detail::add_bulk_impl<T>::add(to, data);
}
template <class>
struct bulk_counter;
template <class>
struct bulk_counter {
static constexpr auto size = 1U;
};
template <class T, class U>
struct bulk_counter<std::pair<T, U>> {
static constexpr auto size = 2U;
};
template <class Request>
void add_blob(Request& to, boost::string_view blob)
{
to.append(std::cbegin(blob), std::cend(blob));
to += separator;
}
/** \internal
* \brief Adds a separator to the request.
* \ingroup any
*
* See mystruct.hpp for an example.
*/
template <class Request>
void add_separator(Request& to)
{
to += separator;
}
} // resp3
} // aedis
#endif // AEDIS_RESP3_COMPOSE_HPP

View File

@@ -129,26 +129,18 @@ struct exec_with_timeout_op {
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, 0);
return;
}
} break;
case 0: self.complete(ec1, n); break;
case 1:
{
if (!ec2) {
self.complete(aedis::error::idle_timeout, 0);
return;
}
if (ec2)
self.complete({}, n);
else
self.complete(aedis::error::exec_timeout, 0);
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, n);
}
}
};

View File

@@ -12,27 +12,25 @@ namespace resp3 {
char const* to_string(type t)
{
static char const* table[] =
{ "array"
, "push"
, "set"
, "map"
, "attribute"
, "simple_string"
, "simple_error"
, "number"
, "doublean"
, "boolean"
, "big_number"
, "null"
, "blob_error"
, "verbatim_string"
, "blob_string"
, "streamed_string_part"
, "invalid"
};
return table[static_cast<int>(t)];
switch (t) {
case type::array: return "array";
case type::push: return "push";
case type::set: return "set";
case type::map: return "map";
case type::attribute: return "attribute";
case type::simple_string: return "simple_string";
case type::simple_error: return "simple_error";
case type::number: return "number";
case type::doublean: return "doublean";
case type::boolean: return "boolean";
case type::big_number: return "big_number";
case type::null: return "null";
case type::blob_error: return "blob_error";
case type::verbatim_string: return "verbatim_string";
case type::blob_string: return "blob_string";
case type::streamed_string_part: return "streamed_string_part";
default: return "invalid";
}
}
std::ostream& operator<<(std::ostream& os, type t)

View File

@@ -7,10 +7,14 @@
#ifndef AEDIS_RESP3_REQUEST_HPP
#define AEDIS_RESP3_REQUEST_HPP
#include <aedis/resp3/compose.hpp>
#include <string>
#include <tuple>
#include <boost/hana.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/resp3/type.hpp>
// NOTE: Consider detecting tuples in the type in the parameter pack
// to calculate the header size correctly.
//
@@ -19,10 +23,130 @@
namespace aedis {
namespace resp3 {
constexpr char separator[] = "\r\n";
/** @brief Adds a bulk to the request.
* @relates request
*
* This function is useful in serialization of your own data
* structures in a request. For example
*
* @code
* void to_bulk(std::string& to, mystruct const& obj)
* {
* auto const str = // Convert obj to a string.
* resp3::to_bulk(to, str);
* }
* @endcode
*
* @param to Storage on which data will be copied into.
* @param data Data that will be serialized and stored in @c to.
*
* See more in \ref requests-serialization.
*/
template <class Request>
void to_bulk(Request& to, boost::string_view data)
{
auto const str = std::to_string(data.size());
to += to_code(type::blob_string);
to.append(std::cbegin(str), std::cend(str));
to += separator;
to.append(std::cbegin(data), std::cend(data));
to += separator;
}
template <class Request, class T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
void to_bulk(Request& to, T n)
{
auto const s = std::to_string(n);
to_bulk(to, boost::string_view{s});
}
namespace detail {
bool has_push_response(boost::string_view cmd);
template <class T>
struct add_bulk_impl {
template <class Request>
static void add(Request& to, T const& from)
{
using namespace aedis::resp3;
to_bulk(to, from);
}
};
template <class U, class V>
struct add_bulk_impl<std::pair<U, V>> {
template <class Request>
static void add(Request& to, std::pair<U, V> const& from)
{
using namespace aedis::resp3;
to_bulk(to, from.first);
to_bulk(to, from.second);
}
};
template <class ...Ts>
struct add_bulk_impl<boost::hana::tuple<Ts...>> {
template <class Request>
static void add(Request& to, boost::hana::tuple<Ts...> const& from)
{
using boost::hana::for_each;
// Fold expressions is C++17 so we use hana.
//(detail::add_bulk(*request_, args), ...);
for_each(from, [&](auto const& e) {
using namespace aedis::resp3;
to_bulk(to, e);
});
}
};
template <class Request>
void add_header(Request& to, type t, std::size_t size)
{
auto const str = std::to_string(size);
to += to_code(t);
to.append(std::cbegin(str), std::cend(str));
to += separator;
}
template <class Request, class T>
void add_bulk(Request& to, T const& data)
{
detail::add_bulk_impl<T>::add(to, data);
}
template <class>
struct bulk_counter;
template <class>
struct bulk_counter {
static constexpr auto size = 1U;
};
template <class T, class U>
struct bulk_counter<std::pair<T, U>> {
static constexpr auto size = 2U;
};
template <class Request>
void add_blob(Request& to, boost::string_view blob)
{
to.append(std::cbegin(blob), std::cend(blob));
to += separator;
}
template <class Request>
void add_separator(Request& to)
{
to += separator;
}
} // detail
/** @brief Creates Redis requests.
@@ -83,9 +207,9 @@ public:
using resp3::type;
auto constexpr pack_size = sizeof...(Ts);
resp3::add_header(payload_, type::array, 1 + pack_size);
resp3::add_bulk(payload_, cmd);
resp3::add_bulk(payload_, make_tuple(args...));
detail::add_header(payload_, type::array, 1 + pack_size);
detail::add_bulk(payload_, cmd);
detail::add_bulk(payload_, make_tuple(args...));
if (!detail::has_push_response(cmd))
++commands_;
@@ -121,14 +245,14 @@ public:
if (begin == end)
return;
auto constexpr size = resp3::bulk_counter<value_type>::size;
auto constexpr size = detail::bulk_counter<value_type>::size;
auto const distance = std::distance(begin, end);
resp3::add_header(payload_, type::array, 2 + size * distance);
resp3::add_bulk(payload_, cmd);
resp3::add_bulk(payload_, key);
detail::add_header(payload_, type::array, 2 + size * distance);
detail::add_bulk(payload_, cmd);
detail::add_bulk(payload_, key);
for (; begin != end; ++begin)
resp3::add_bulk(payload_, *begin);
detail::add_bulk(payload_, *begin);
if (!detail::has_push_response(cmd))
++commands_;
@@ -160,13 +284,13 @@ public:
if (begin == end)
return;
auto constexpr size = resp3::bulk_counter<value_type>::size;
auto constexpr size = detail::bulk_counter<value_type>::size;
auto const distance = std::distance(begin, end);
resp3::add_header(payload_, type::array, 1 + size * distance);
resp3::add_bulk(payload_, cmd);
detail::add_header(payload_, type::array, 1 + size * distance);
detail::add_bulk(payload_, cmd);
for (; begin != end; ++begin)
resp3::add_bulk(payload_, *begin);
detail::add_bulk(payload_, *begin);
if (!detail::has_push_response(cmd))
++commands_;

245
include/aedis/sync.hpp Normal file
View File

@@ -0,0 +1,245 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_SYNC_HPP
#define AEDIS_SYNC_HPP
#include <condition_variable>
#include <aedis/resp3/request.hpp>
namespace aedis {
/** @brief A high level synchronous connection to Redis.
* @ingroup any
*
* This class keeps a healthy and thread safe connection to the Redis
* instance where commands can be sent at any time. For more details,
* please see the documentation of each individual function.
*
*/
template <class Connection>
class sync {
public:
using event = typename Connection::event;
using config = typename Connection::config;
/** @brief Constructor
*
* @param ex Executor
* @param cfg Config options.
*/
template <class Executor>
sync(Executor ex, config cfg = config{}) : conn_{ex, cfg} { }
/** @brief Executes a request synchronously.
*
* The functions calls `connections::async_receive_exec` and waits
* for its completion.
*
* @param req The request.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter>
std::size_t
exec(resp3::request const& req, ResponseAdapter adapter, boost::system::error_code& ec)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, &req, adapter]()
{
conn_.async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Executes a command synchronously
*
* The functions calls `connections::async_exec` and waits for its
* completion.
*
* @param req The request.
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter = detail::response_traits<void>::adapter_type>
std::size_t exec(resp3::request const& req, ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = exec(req, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives server pushes synchronusly.
*
* The functions calls `connections::async_receive_push` and
* waits for its completion.
*
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes received.
*/
template <class ResponseAdapter>
auto receive_push(ResponseAdapter adapter, boost::system::error_code& ec)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, adapter]()
{
conn_.async_receive_push(adapter, [&ec, &res, &sh](auto const& e, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = e;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Receives server pushes synchronusly.
*
* The functions calls `connections::async_receive_push` and
* waits for its completion.
*
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes received.
*/
template <class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
auto receive_push(ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = receive_push(adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives events synchronously.
*
* The functions calls `connections::async_receive_event` and
* waits for its completion.
*
* @param ec Error code in case of error.
* @returns The event received.
*/
auto receive_event(boost::system::error_code& ec)
{
sync_helper sh;
auto res = event::invalid;
auto f = [this, &ec, &res, &sh]()
{
conn_.async_receive_event([&ec, &res, &sh](auto const& ecp, event ev) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = ev;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Receives events synchronously
*
* The functions calls `connections::async_receive_event` and
* waits for its completion.
*
* @throws std::system_error in case of error.
* @returns The event received.
*/
auto receive_event()
{
boost::system::error_code ec;
auto const res = receive_event(ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Calls \c async_run from the underlying connection.
*
* The functions calls `connections::async_run` and waits for its
* completion.
*
* @param ec Error code.
*/
void run(boost::system::error_code& ec)
{
sync_helper sh;
auto f = [this, &ec, &sh]()
{
conn_.async_run([&ec, &sh](auto const& e) {
std::unique_lock ul(sh.mutex);
ec = e;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
}
/** @brief Calls \c async_run from the underlying connection.
*
* The functions calls `connections::async_run` and waits for its
* completion.
*
* @throws std::system_error.
*/
void run()
{
boost::system::error_code ec;
run(ec);
if (ec)
throw std::system_error(ec);
}
private:
struct sync_helper {
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
};
Connection conn_;
};
} // aedis
#endif // AEDIS_SYNC_HPP

View File

@@ -7,6 +7,7 @@
#include <map>
#include <iostream>
#include <optional>
#include <sstream>
#include <boost/system/errc.hpp>
#include <boost/asio/awaitable.hpp>
@@ -740,3 +741,70 @@ BOOST_AUTO_TEST_CASE(test_null)
ioc.run();
}
//-----------------------------------------------------------------------------------
void check_error(char const* name, aedis::error ev)
{
auto const ec = aedis::make_error_code(ev);
auto const& cat = ec.category();
BOOST_TEST(std::string(ec.category().name()) == name);
BOOST_TEST(!ec.message().empty());
BOOST_TEST(cat.equivalent(
static_cast<std::underlying_type<aedis::error>::type>(ev),
ec.category().default_error_condition(
static_cast<std::underlying_type<aedis::error>::type>(ev))));
BOOST_TEST(cat.equivalent(ec,
static_cast<std::underlying_type<aedis::error>::type>(ev)));
}
BOOST_AUTO_TEST_CASE(error)
{
check_error("aedis", aedis::error::resolve_timeout);
check_error("aedis", aedis::error::resolve_timeout);
check_error("aedis", aedis::error::connect_timeout);
check_error("aedis", aedis::error::idle_timeout);
check_error("aedis", aedis::error::exec_timeout);
check_error("aedis", aedis::error::invalid_data_type);
check_error("aedis", aedis::error::not_a_number);
check_error("aedis", aedis::error::unexpected_read_size);
check_error("aedis", aedis::error::exceeeds_max_nested_depth);
check_error("aedis", aedis::error::unexpected_bool_value);
check_error("aedis", aedis::error::empty_field);
check_error("aedis", aedis::error::expects_resp3_simple_type);
check_error("aedis", aedis::error::expects_resp3_aggregate);
check_error("aedis", aedis::error::expects_resp3_map);
check_error("aedis", aedis::error::expects_resp3_set);
check_error("aedis", aedis::error::nested_aggregate_unsupported);
check_error("aedis", aedis::error::simple_error);
check_error("aedis", aedis::error::blob_error);
check_error("aedis", aedis::error::incompatible_size);
check_error("aedis", aedis::error::not_a_double);
check_error("aedis", aedis::error::null);
}
std::string get_type_as_str(aedis::resp3::type t)
{
std::ostringstream ss;
ss << t;
return ss.str();
}
BOOST_AUTO_TEST_CASE(type)
{
BOOST_TEST(!get_type_as_str(aedis::resp3::type::array).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::push).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::set).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::map).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::attribute).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::simple_string).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::simple_error).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::number).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::doublean).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::boolean).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::big_number).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::null).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::blob_error).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::verbatim_string).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::blob_string).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::streamed_string_part).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::invalid).empty());
}