mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Moves sync functions from experimental to connection and improves code coverage.
This commit is contained in:
@@ -5,11 +5,8 @@ codecov:
|
||||
after_n_builds: 1
|
||||
wait_for_ci: yes
|
||||
|
||||
# Change how pull request comments look
|
||||
comment:
|
||||
layout: "reach,diff,flags,files,footer"
|
||||
|
||||
ignore:
|
||||
- "benchmarks/*"
|
||||
- "**/benchmarks/*"
|
||||
- "**/examples/*"
|
||||
- "/usr/*"
|
||||
- "**/boost/*"
|
||||
|
||||
23
CHANGELOG.md
23
CHANGELOG.md
@@ -2,19 +2,22 @@
|
||||
|
||||
## master
|
||||
|
||||
* Removes collision between `aedis::adapter::adapt` and
|
||||
`aedis::adapt`.
|
||||
* Makes all free functions from the `sync.hpp` member functions of the
|
||||
`connection` class.
|
||||
|
||||
* Fixes a bug on reconnect from a state where the `connection`
|
||||
object had unsent commands. This would cause `async_exec` to never
|
||||
complete.
|
||||
* Removes collision between `aedis::adapter::adapt` and
|
||||
`aedis::adapt`.
|
||||
|
||||
* Split \c connection::async_receive_event in two functions, one to
|
||||
receive events and other for server side pushes.
|
||||
* Fixes a bug on reconnect from a state where the `connection` object
|
||||
had unsent commands. This would cause `async_exec` to never
|
||||
complete.
|
||||
|
||||
* Adds `connection::operation` enum to replace `cancel_*` member
|
||||
functions with a single cancel function that gets what should be
|
||||
cancelled as argument.
|
||||
* Split `connection::async_receive_event` in two functions, one to
|
||||
receive events and other for server side pushes.
|
||||
|
||||
* Adds `connection::operation` enum to replace `cancel_*` member
|
||||
functions with a single cancel function that gets what should be
|
||||
cancelled as argument.
|
||||
|
||||
## v0.3.0
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
#include <string>
|
||||
#include <boost/asio.hpp>
|
||||
#include <aedis.hpp>
|
||||
#include <aedis/experimental/sync.hpp>
|
||||
|
||||
// Include this in no more than one .cpp file.
|
||||
#include <aedis/src.hpp>
|
||||
@@ -16,7 +15,6 @@
|
||||
namespace net = boost::asio;
|
||||
using aedis::adapt;
|
||||
using aedis::resp3::request;
|
||||
using aedis::experimental::exec;
|
||||
using connection = aedis::connection<>;
|
||||
|
||||
int main()
|
||||
@@ -35,7 +33,7 @@ int main()
|
||||
req.push("QUIT");
|
||||
|
||||
std::tuple<std::string, aedis::ignore> resp;
|
||||
exec(conn, req, adapt(resp));
|
||||
conn.exec(req, adapt(resp));
|
||||
thread.join();
|
||||
|
||||
std::cout << "Response: " << std::get<0>(resp) << std::endl;
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
#include <string>
|
||||
#include <boost/asio.hpp>
|
||||
#include <aedis.hpp>
|
||||
#include <aedis/experimental/sync.hpp>
|
||||
#include "print.hpp"
|
||||
|
||||
// Include this in no more than one .cpp file.
|
||||
@@ -16,9 +15,6 @@
|
||||
|
||||
namespace net = boost::asio;
|
||||
using aedis::resp3::request;
|
||||
using aedis::experimental::exec;
|
||||
using aedis::experimental::receive_event;
|
||||
using aedis::experimental::receive_push;
|
||||
using connection = aedis::connection<>;
|
||||
using aedis::resp3::node;
|
||||
using aedis::adapt;
|
||||
@@ -29,7 +25,7 @@ using event = connection::event;
|
||||
void push_receiver(connection& conn)
|
||||
{
|
||||
for (std::vector<node<std::string>> resp;;) {
|
||||
receive_push(conn, adapt(resp));
|
||||
conn.receive_push(adapt(resp));
|
||||
print_push(resp);
|
||||
resp.clear();
|
||||
}
|
||||
@@ -41,9 +37,9 @@ void event_receiver(connection& conn)
|
||||
req.push("SUBSCRIBE", "channel");
|
||||
|
||||
for (;;) {
|
||||
auto ev = receive_event(conn);
|
||||
auto ev = conn.receive_event();
|
||||
if (ev == connection::event::hello)
|
||||
exec(conn, req);
|
||||
conn.exec(req);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ nobase_include_HEADERS =\
|
||||
$(top_srcdir)/include/aedis/adapt.hpp\
|
||||
$(top_srcdir)/include/aedis/detail/connection_ops.hpp\
|
||||
$(top_srcdir)/include/aedis.hpp\
|
||||
$(top_srcdir)/include/aedis/experimental/sync.hpp\
|
||||
$(top_srcdir)/include/aedis/adapter/detail/adapters.hpp\
|
||||
$(top_srcdir)/include/aedis/adapter/adapt.hpp\
|
||||
$(top_srcdir)/include/aedis/adapter/detail/response_traits.hpp\
|
||||
|
||||
@@ -146,7 +146,7 @@ public:
|
||||
return;
|
||||
|
||||
if (is_aggregate(n.data_type)) {
|
||||
ec = error::expects_simple_type;
|
||||
ec = error::expects_resp3_simple_type;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -175,14 +175,14 @@ public:
|
||||
|
||||
if (is_aggregate(nd.data_type)) {
|
||||
if (nd.data_type != resp3::type::set)
|
||||
ec = error::expects_set_type;
|
||||
ec = error::expects_resp3_set;
|
||||
return;
|
||||
}
|
||||
|
||||
BOOST_ASSERT(nd.aggregate_size == 1);
|
||||
|
||||
if (nd.depth < 1) {
|
||||
ec = error::expects_set_type;
|
||||
ec = error::expects_resp3_set;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -214,14 +214,14 @@ public:
|
||||
|
||||
if (is_aggregate(nd.data_type)) {
|
||||
if (element_multiplicity(nd.data_type) != 2)
|
||||
ec = error::expects_map_type;
|
||||
ec = error::expects_resp3_map;
|
||||
return;
|
||||
}
|
||||
|
||||
BOOST_ASSERT(nd.aggregate_size == 1);
|
||||
|
||||
if (nd.depth < 1) {
|
||||
ec = error::expects_map_type;
|
||||
ec = error::expects_resp3_map;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -294,7 +294,7 @@ public:
|
||||
}
|
||||
} else {
|
||||
if (i_ == -1) {
|
||||
ec = error::expects_aggregate_type;
|
||||
ec = error::expects_resp3_aggregate;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -324,7 +324,7 @@ struct list_impl {
|
||||
if (!is_aggregate(nd.data_type)) {
|
||||
BOOST_ASSERT(nd.aggregate_size == 1);
|
||||
if (nd.depth < 1) {
|
||||
ec = error::expects_aggregate_type;
|
||||
ec = error::expects_resp3_aggregate;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,10 +13,12 @@
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <condition_variable>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/bind_executor.hpp>
|
||||
#include <boost/asio/experimental/channel.hpp>
|
||||
|
||||
#include <aedis/adapt.hpp>
|
||||
@@ -152,6 +154,79 @@ public:
|
||||
/// Returns the executor.
|
||||
auto get_executor() {return resv_.get_executor();}
|
||||
|
||||
/** @brief Cancel operations.
|
||||
*
|
||||
* @li operation::exec: Cancels all operations started with \c async_exec.
|
||||
* @li operation::run: Cancels @c async_run. The prefered way to
|
||||
* close a connection is to set config::enable_reconnect to
|
||||
* false and send a \c quit command. Otherwise an unresponsive Redis server
|
||||
* will cause the idle-checks to kick in and lead to \c
|
||||
* async_run returning with idle_timeout. Calling \c
|
||||
* cancel(operation::run) directly should be seen as the last
|
||||
* option.
|
||||
* @li operation::receive_event: Cancels @c async_receive_event.
|
||||
*
|
||||
* @param op: The operation to be cancelled.
|
||||
* @returns The number of operations that have been canceled.
|
||||
*/
|
||||
std::size_t cancel(operation op)
|
||||
{
|
||||
switch (op) {
|
||||
case operation::exec:
|
||||
{
|
||||
for (auto& e: reqs_) {
|
||||
e->stop = true;
|
||||
e->timer.cancel_one();
|
||||
}
|
||||
|
||||
auto const ret = reqs_.size();
|
||||
reqs_ = {};
|
||||
return ret;
|
||||
}
|
||||
case operation::run:
|
||||
{
|
||||
if (socket_)
|
||||
socket_->close();
|
||||
|
||||
read_timer_.cancel();
|
||||
check_idle_timer_.cancel();
|
||||
writer_timer_.cancel();
|
||||
ping_timer_.cancel();
|
||||
|
||||
// Cancel own pings if there are any waiting.
|
||||
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
|
||||
return !ptr->req->close_on_run_completion;
|
||||
});
|
||||
|
||||
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
|
||||
ptr->stop = true;
|
||||
ptr->timer.cancel();
|
||||
});
|
||||
|
||||
reqs_.erase(point, std::end(reqs_));
|
||||
return 1U;
|
||||
}
|
||||
case operation::receive_event:
|
||||
{
|
||||
event_channel_.cancel();
|
||||
return 1U;
|
||||
}
|
||||
case operation::receive_push:
|
||||
{
|
||||
push_channel_.cancel();
|
||||
return 1U;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// Get the config object.
|
||||
config& get_config() noexcept { return cfg_;}
|
||||
|
||||
/// Gets the config object.
|
||||
config const& get_config() const noexcept { return cfg_;}
|
||||
|
||||
/** @name Asynchronous functions
|
||||
**/
|
||||
|
||||
@@ -319,80 +394,174 @@ public:
|
||||
}
|
||||
/// @}
|
||||
|
||||
/** @brief Cancel operations.
|
||||
/** @name Synchronous functions
|
||||
**/
|
||||
|
||||
/// @{
|
||||
/** @brief Executes a request.
|
||||
*
|
||||
* @li operation::exec: Cancels all operations started with \c async_exec.
|
||||
* @li operation::run: Cancels @c async_run. The prefered way to
|
||||
* close a connection is to set config::enable_reconnect to
|
||||
* false and send a \c quit command. Otherwise an unresponsive Redis server
|
||||
* will cause the idle-checks to kick in and lead to \c
|
||||
* async_run returning with idle_timeout. Calling \c
|
||||
* cancel(operation::run) directly should be seen as the last
|
||||
* option.
|
||||
* @li operation::receive_event: Cancels @c async_receive_event.
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param op: The operation to be cancelled.
|
||||
* @returns The number of operations that have been canceled.
|
||||
* @param req The request.
|
||||
* @param adapter The response adapter.
|
||||
* @param ec Error code in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
std::size_t cancel(operation op)
|
||||
template <class ResponseAdapter>
|
||||
std::size_t
|
||||
exec(resp3::request const& req, ResponseAdapter adapter, boost::system::error_code& ec)
|
||||
{
|
||||
switch (op) {
|
||||
case operation::exec:
|
||||
{
|
||||
for (auto& e: reqs_) {
|
||||
e->stop = true;
|
||||
e->timer.cancel_one();
|
||||
}
|
||||
sync sh;
|
||||
std::size_t res = 0;
|
||||
|
||||
auto const ret = reqs_.size();
|
||||
reqs_ = {};
|
||||
return ret;
|
||||
}
|
||||
case operation::run:
|
||||
{
|
||||
if (socket_)
|
||||
socket_->close();
|
||||
auto f = [this, &ec, &res, &sh, &req, adapter]()
|
||||
{
|
||||
async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
|
||||
std::unique_lock ul(sh.mutex);
|
||||
ec = ecp;
|
||||
res = n;
|
||||
sh.ready = true;
|
||||
ul.unlock();
|
||||
sh.cv.notify_one();
|
||||
});
|
||||
};
|
||||
|
||||
read_timer_.cancel();
|
||||
check_idle_timer_.cancel();
|
||||
writer_timer_.cancel();
|
||||
ping_timer_.cancel();
|
||||
|
||||
// Cancel own pings if there are any waiting.
|
||||
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
|
||||
return !ptr->req->close_on_run_completion;
|
||||
});
|
||||
|
||||
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
|
||||
ptr->stop = true;
|
||||
ptr->timer.cancel();
|
||||
});
|
||||
|
||||
reqs_.erase(point, std::end(reqs_));
|
||||
return 1U;
|
||||
}
|
||||
case operation::receive_event:
|
||||
{
|
||||
event_channel_.cancel();
|
||||
return 1U;
|
||||
}
|
||||
case operation::receive_push:
|
||||
{
|
||||
push_channel_.cancel();
|
||||
return 1U;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
boost::asio::dispatch(boost::asio::bind_executor(get_executor(), f));
|
||||
std::unique_lock lk(sh.mutex);
|
||||
sh.cv.wait(lk, [&sh]{return sh.ready;});
|
||||
return res;
|
||||
}
|
||||
|
||||
/// Get the config object.
|
||||
config& get_config() noexcept { return cfg_;}
|
||||
/** @brief Executes a command.
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param req The request.
|
||||
* @param adapter The response adapter.
|
||||
* @throws std::system_error in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
template <class ResponseAdapter = detail::response_traits<void>::adapter_type>
|
||||
std::size_t exec(resp3::request const& req, ResponseAdapter adapter = aedis::adapt())
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto const res = exec(req, adapter, ec);
|
||||
if (ec)
|
||||
throw std::system_error(ec);
|
||||
return res;
|
||||
}
|
||||
|
||||
/// Gets the config object.
|
||||
config const& get_config() const noexcept { return cfg_;}
|
||||
/** @brief Receives server pushes synchronusly.
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param adapter The response adapter.
|
||||
* @param ec Error code in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
template <class ResponseAdapter>
|
||||
auto receive_push(ResponseAdapter adapter, boost::system::error_code& ec)
|
||||
{
|
||||
sync sh;
|
||||
std::size_t res = 0;
|
||||
|
||||
auto f = [this, &ec, &res, &sh, adapter]()
|
||||
{
|
||||
async_receive_push(adapter, [&ec, &res, &sh](auto const& e, std::size_t n) {
|
||||
std::unique_lock ul(sh.mutex);
|
||||
ec = e;
|
||||
res = n;
|
||||
sh.ready = true;
|
||||
ul.unlock();
|
||||
sh.cv.notify_one();
|
||||
});
|
||||
};
|
||||
|
||||
boost::asio::dispatch(boost::asio::bind_executor(get_executor(), f));
|
||||
std::unique_lock lk(sh.mutex);
|
||||
sh.cv.wait(lk, [&sh]{return sh.ready;});
|
||||
return res;
|
||||
}
|
||||
|
||||
/** @brief Receives server pushes synchronusly.
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param adapter The response adapter.
|
||||
* @throws std::system_error in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
template <class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
|
||||
auto receive_push(ResponseAdapter adapter = aedis::adapt())
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto const res = receive_push(adapter, ec);
|
||||
if (ec)
|
||||
throw std::system_error(ec);
|
||||
return res;
|
||||
}
|
||||
|
||||
/** @brief Receives events
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param ec Error code in case of error.
|
||||
* @returns The event received.
|
||||
*/
|
||||
auto receive_event(boost::system::error_code& ec)
|
||||
{
|
||||
sync sh;
|
||||
auto res = event::invalid;
|
||||
|
||||
auto f = [this, &ec, &res, &sh]()
|
||||
{
|
||||
async_receive_event([&ec, &res, &sh](auto const& ecp, event ev) {
|
||||
std::unique_lock ul(sh.mutex);
|
||||
ec = ecp;
|
||||
res = ev;
|
||||
sh.ready = true;
|
||||
ul.unlock();
|
||||
sh.cv.notify_one();
|
||||
});
|
||||
};
|
||||
|
||||
boost::asio::dispatch(boost::asio::bind_executor(get_executor(), f));
|
||||
std::unique_lock lk(sh.mutex);
|
||||
sh.cv.wait(lk, [&sh]{return sh.ready;});
|
||||
return res;
|
||||
}
|
||||
|
||||
/** @brief Receives events
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @throws std::system_error in case of error.
|
||||
* @returns The event received.
|
||||
*/
|
||||
auto receive_event()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto const res = receive_event(ec);
|
||||
if (ec)
|
||||
throw std::system_error(ec);
|
||||
return res;
|
||||
}
|
||||
|
||||
/// @}
|
||||
|
||||
private:
|
||||
struct sync {
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
bool ready = false;
|
||||
};
|
||||
|
||||
struct req_info {
|
||||
req_info(executor_type ex) : timer{ex} {}
|
||||
timer_type timer;
|
||||
|
||||
@@ -44,16 +44,16 @@ enum class error
|
||||
empty_field,
|
||||
|
||||
/// Expects a simple RESP3 type but got an aggregate.
|
||||
expects_simple_type,
|
||||
expects_resp3_simple_type,
|
||||
|
||||
/// Expects aggregate type.
|
||||
expects_aggregate_type,
|
||||
/// Expects aggregate.
|
||||
expects_resp3_aggregate,
|
||||
|
||||
/// Expects a map but got other aggregate.
|
||||
expects_map_type,
|
||||
expects_resp3_map,
|
||||
|
||||
/// Expects a set aggregate but got something else.
|
||||
expects_set_type,
|
||||
expects_resp3_set,
|
||||
|
||||
/// Nested response not supported.
|
||||
nested_aggregate_unsupported,
|
||||
@@ -70,7 +70,7 @@ enum class error
|
||||
/// Not a double
|
||||
not_a_double,
|
||||
|
||||
/// Got RESP3 null type.
|
||||
/// Got RESP3 null.
|
||||
null
|
||||
};
|
||||
|
||||
|
||||
@@ -1,220 +0,0 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#ifndef AEDIS_EXPERIMENTAL_SYNC_HPP
|
||||
#define AEDIS_EXPERIMENTAL_SYNC_HPP
|
||||
|
||||
#include <aedis/adapt.hpp>
|
||||
#include <aedis/connection.hpp>
|
||||
#include <aedis/resp3/request.hpp>
|
||||
|
||||
namespace aedis {
|
||||
namespace experimental {
|
||||
namespace detail {
|
||||
|
||||
struct sync {
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
bool ready = false;
|
||||
};
|
||||
|
||||
} // detail
|
||||
|
||||
/** @brief Executes a request.
|
||||
* @ingroup any
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param conn The connection.
|
||||
* @param req The request.
|
||||
* @param adapter The response adapter.
|
||||
* @param ec Error code in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
template <class Connection, class ResponseAdapter>
|
||||
std::size_t
|
||||
exec(
|
||||
Connection& conn,
|
||||
resp3::request const& req,
|
||||
ResponseAdapter adapter,
|
||||
boost::system::error_code& ec)
|
||||
{
|
||||
detail::sync sh;
|
||||
std::size_t res = 0;
|
||||
|
||||
auto f = [&conn, &ec, &res, &sh, &req, adapter]()
|
||||
{
|
||||
conn.async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
|
||||
std::unique_lock ul(sh.mutex);
|
||||
ec = ecp;
|
||||
res = n;
|
||||
sh.ready = true;
|
||||
ul.unlock();
|
||||
sh.cv.notify_one();
|
||||
});
|
||||
};
|
||||
|
||||
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
|
||||
std::unique_lock lk(sh.mutex);
|
||||
sh.cv.wait(lk, [&sh]{return sh.ready;});
|
||||
return res;
|
||||
}
|
||||
|
||||
/** @brief Executes a command.
|
||||
* @ingroup any
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param conn The connection.
|
||||
* @param req The request.
|
||||
* @param adapter The response adapter.
|
||||
* @throws std::system_error in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
template <
|
||||
class Connection,
|
||||
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
|
||||
std::size_t
|
||||
exec(
|
||||
Connection& conn,
|
||||
resp3::request const& req,
|
||||
ResponseAdapter adapter = aedis::adapt())
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto const res = exec(conn, req, adapter, ec);
|
||||
if (ec)
|
||||
throw std::system_error(ec);
|
||||
return res;
|
||||
}
|
||||
|
||||
/** @brief Receives server pushes synchronusly.
|
||||
* @ingroup any
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param conn The connection.
|
||||
* @param adapter The response adapter.
|
||||
* @param ec Error code in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
template <class Connection, class ResponseAdapter>
|
||||
auto receive_push(
|
||||
Connection& conn,
|
||||
ResponseAdapter adapter,
|
||||
boost::system::error_code& ec)
|
||||
{
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
bool ready = false;
|
||||
std::size_t n = 0;
|
||||
|
||||
auto f = [&conn, &ec, &mutex, &cv, &n, &ready, adapter]()
|
||||
{
|
||||
conn.async_receive_push(adapter, [&cv, &mutex, &n, &ready, &ec](auto const& ecp, std::size_t evp) {
|
||||
std::unique_lock ul(mutex);
|
||||
ec = ecp;
|
||||
n = evp;
|
||||
ready = true;
|
||||
ul.unlock();
|
||||
cv.notify_one();
|
||||
});
|
||||
};
|
||||
|
||||
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
|
||||
std::unique_lock lk(mutex);
|
||||
cv.wait(lk, [&ready]{return ready;});
|
||||
return n;
|
||||
}
|
||||
|
||||
/** @brief Receives server pushes synchronusly.
|
||||
* @ingroup any
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param conn The connection.
|
||||
* @param adapter The response adapter.
|
||||
* @throws std::system_error in case of error.
|
||||
* @returns The number of bytes of the response.
|
||||
*/
|
||||
template <
|
||||
class Connection,
|
||||
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
|
||||
auto receive_push(
|
||||
Connection& conn,
|
||||
ResponseAdapter adapter = aedis::adapt())
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto const res = receive_push(conn, adapter, ec);
|
||||
if (ec)
|
||||
throw std::system_error(ec);
|
||||
return res;
|
||||
}
|
||||
|
||||
/** @brief Receives events
|
||||
* @ingroup any
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param conn The connection.
|
||||
* @param ec Error code in case of error.
|
||||
* @returns The event received.
|
||||
*/
|
||||
template <class Connection>
|
||||
auto receive_event(Connection& conn, boost::system::error_code& ec)
|
||||
{
|
||||
using event_type = typename Connection::event;
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
bool ready = false;
|
||||
event_type ev = event_type::invalid;
|
||||
|
||||
auto f = [&conn, &ec, &ev, &mutex, &cv, &ready]()
|
||||
{
|
||||
conn.async_receive_event([&cv, &mutex, &ready, &ev, &ec](auto const& ecp, event_type evp) {
|
||||
std::unique_lock ul(mutex);
|
||||
ec = ecp;
|
||||
ev = evp;
|
||||
ready = true;
|
||||
ul.unlock();
|
||||
cv.notify_one();
|
||||
});
|
||||
};
|
||||
|
||||
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
|
||||
std::unique_lock lk(mutex);
|
||||
cv.wait(lk, [&ready]{return ready;});
|
||||
return ev;
|
||||
}
|
||||
|
||||
/** @brief Receives events
|
||||
* @ingroup any
|
||||
*
|
||||
* @remark This function will block until execution completes. It
|
||||
* assumes the connection is running on a different thread.
|
||||
*
|
||||
* @param conn The connection.
|
||||
* @throws std::system_error in case of error.
|
||||
* @returns The event received.
|
||||
*/
|
||||
template <class Connection>
|
||||
auto receive_event(Connection& conn)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto const res = receive_event(conn, ec);
|
||||
if (ec)
|
||||
throw std::system_error(ec);
|
||||
return res;
|
||||
}
|
||||
|
||||
} // experimental
|
||||
} // aedis
|
||||
|
||||
#endif // AEDIS_EXPERIMENTAL_SYNC_HPP
|
||||
@@ -28,10 +28,10 @@ struct error_category_impl : boost::system::error_category {
|
||||
case error::exceeeds_max_nested_depth: return "Exceeds the maximum number of nested responses.";
|
||||
case error::unexpected_bool_value: return "Unexpected bool value.";
|
||||
case error::empty_field: return "Expected field value is empty.";
|
||||
case error::expects_simple_type: return "Expects a simple RESP3 type.";
|
||||
case error::expects_aggregate_type: return "Expects aggregate type.";
|
||||
case error::expects_map_type: return "Expects map type.";
|
||||
case error::expects_set_type: return "Expects set type.";
|
||||
case error::expects_resp3_simple_type: return "Expects a resp3 simple type.";
|
||||
case error::expects_resp3_aggregate: return "Expects resp3 aggregate.";
|
||||
case error::expects_resp3_map: return "Expects resp3 map.";
|
||||
case error::expects_resp3_set: return "Expects resp3 set.";
|
||||
case error::nested_aggregate_unsupported: return "Nested aggregate unsupported.";
|
||||
case error::simple_error: return "Got RESP3 simple-error.";
|
||||
case error::blob_error: return "Got RESP3 blob-error.";
|
||||
|
||||
@@ -110,12 +110,13 @@ BOOST_AUTO_TEST_CASE(test_number)
|
||||
auto const in03 = expect<int>{":11\r\n", int{11}, "number.int"};
|
||||
auto const in04 = expect<boost::optional<int>>{":11\r\n", ok, "number.optional.int"};
|
||||
auto const in05 = expect<std::tuple<int>>{"*1\r\n:11\r\n", std::tuple<int>{11}, "number.tuple.int"};
|
||||
auto const in06 = expect<boost::optional<int>>{"%11\r\n", boost::optional<int>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_simple_type)};
|
||||
auto const in07 = expect<std::set<std::string>>{":11\r\n", std::set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_set_type)};
|
||||
auto const in08 = expect<std::unordered_set<std::string>>{":11\r\n", std::unordered_set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_set_type)};
|
||||
auto const in09 = expect<std::map<std::string, std::string>>{":11\r\n", std::map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_map_type)};
|
||||
auto const in10 = expect<std::unordered_map<std::string, std::string>>{":11\r\n", std::unordered_map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_map_type)};
|
||||
auto const in11 = expect<std::list<std::string>>{":11\r\n", std::list<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_aggregate_type)};
|
||||
auto const in06 = expect<int>{"_\r\n", int{0}, "number.int", aedis::make_error_code(aedis::error::null)};
|
||||
auto const in07 = expect<boost::optional<int>>{"%11\r\n", boost::optional<int>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_simple_type)};
|
||||
auto const in08 = expect<std::set<std::string>>{":11\r\n", std::set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_set)};
|
||||
auto const in09 = expect<std::unordered_set<std::string>>{":11\r\n", std::unordered_set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_set)};
|
||||
auto const in10 = expect<std::map<std::string, std::string>>{":11\r\n", std::map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_map)};
|
||||
auto const in11 = expect<std::unordered_map<std::string, std::string>>{":11\r\n", std::unordered_map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_map)};
|
||||
auto const in12 = expect<std::list<std::string>>{":11\r\n", std::list<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_aggregate)};
|
||||
|
||||
auto ex = ioc.get_executor();
|
||||
|
||||
@@ -130,6 +131,7 @@ BOOST_AUTO_TEST_CASE(test_number)
|
||||
test_sync(ex, in09);
|
||||
test_sync(ex, in10);
|
||||
test_sync(ex, in11);
|
||||
test_sync(ex, in12);
|
||||
|
||||
test_async(ex, in01);
|
||||
test_async(ex, in02);
|
||||
@@ -142,6 +144,7 @@ BOOST_AUTO_TEST_CASE(test_number)
|
||||
test_async(ex, in09);
|
||||
test_async(ex, in10);
|
||||
test_async(ex, in11);
|
||||
test_async(ex, in12);
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
@@ -160,10 +163,10 @@ BOOST_AUTO_TEST_CASE(test_bool)
|
||||
|
||||
// Error
|
||||
auto const in01 = expect<boost::optional<bool>>{"#11\r\n", boost::optional<bool>{}, "bool.error", aedis::make_error_code(aedis::error::unexpected_bool_value)};
|
||||
auto const in03 = expect<std::set<int>>{"#t\r\n", std::set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_set_type)};
|
||||
auto const in04 = expect<std::unordered_set<int>>{"#t\r\n", std::unordered_set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_set_type)};
|
||||
auto const in05 = expect<std::map<int, int>>{"#t\r\n", std::map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_map_type)};
|
||||
auto const in06 = expect<std::unordered_map<int, int>>{"#t\r\n", std::unordered_map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_map_type)};
|
||||
auto const in03 = expect<std::set<int>>{"#t\r\n", std::set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_set)};
|
||||
auto const in04 = expect<std::unordered_set<int>>{"#t\r\n", std::unordered_set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_set)};
|
||||
auto const in05 = expect<std::map<int, int>>{"#t\r\n", std::map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_map)};
|
||||
auto const in06 = expect<std::unordered_map<int, int>>{"#t\r\n", std::unordered_map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_map)};
|
||||
|
||||
auto ex = ioc.get_executor();
|
||||
|
||||
@@ -259,6 +262,7 @@ BOOST_AUTO_TEST_CASE(test_map)
|
||||
using op_vec_type = boost::optional<std::vector<std::string>>;
|
||||
using tuple_type = std::tuple<std::string, std::string, std::string, std::string, std::string, std::string, std::string, std::string>;
|
||||
|
||||
std::string const wire2 = "*3\r\n$2\r\n11\r\n$2\r\n22\r\n$1\r\n3\r\n";
|
||||
std::string const wire = "%4\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n$4\r\nkey3\r\n$6\r\nvalue3\r\n$4\r\nkey3\r\n$6\r\nvalue3\r\n";
|
||||
|
||||
std::vector<node_type> expected_1a
|
||||
@@ -329,8 +333,10 @@ BOOST_AUTO_TEST_CASE(test_map)
|
||||
auto const in07 = expect<op_map_type>{wire, expected_1d, "map.optional.map"};
|
||||
auto const in08 = expect<op_vec_type>{wire, expected_1e, "map.optional.vector"};
|
||||
auto const in09 = expect<std::tuple<op_map_type>>{"*1\r\n" + wire, std::tuple<op_map_type>{expected_1d}, "map.transaction.optional.map"};
|
||||
auto const in10 = expect<int>{"%11\r\n", int{}, "map.invalid.int", aedis::make_error_code(aedis::error::expects_simple_type)};
|
||||
auto const in10 = expect<int>{"%11\r\n", int{}, "map.invalid.int", aedis::make_error_code(aedis::error::expects_resp3_simple_type)};
|
||||
auto const in11 = expect<tuple_type>{wire, e1f, "map.tuple"};
|
||||
auto const in12 = expect<map_type>{wire2, map_type{}, "map.error", aedis::make_error_code(aedis::error::expects_resp3_map)};
|
||||
auto const in13 = expect<map_type>{"_\r\n", map_type{}, "map.null", aedis::make_error_code(aedis::error::null)};
|
||||
|
||||
auto ex = ioc.get_executor();
|
||||
|
||||
@@ -345,6 +351,8 @@ BOOST_AUTO_TEST_CASE(test_map)
|
||||
test_sync(ex, in09);
|
||||
test_sync(ex, in00);
|
||||
test_sync(ex, in11);
|
||||
test_sync(ex, in12);
|
||||
test_sync(ex, in13);
|
||||
|
||||
test_async(ex, in00);
|
||||
test_async(ex, in01);
|
||||
@@ -357,6 +365,8 @@ BOOST_AUTO_TEST_CASE(test_map)
|
||||
test_async(ex, in09);
|
||||
test_async(ex, in00);
|
||||
test_async(ex, in11);
|
||||
test_async(ex, in12);
|
||||
test_async(ex, in13);
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
@@ -416,6 +426,8 @@ BOOST_AUTO_TEST_CASE(test_array)
|
||||
auto const in06 = expect<std::array<int, 3>>{wire, e1f, "array.array"};
|
||||
auto const in07 = expect<std::list<int>>{wire, e1g, "array.list"};
|
||||
auto const in08 = expect<std::deque<int>>{wire, e1h, "array.deque"};
|
||||
auto const in09 = expect<std::vector<int>>{"_\r\n", std::vector<int>{}, "array.vector", aedis::make_error_code(aedis::error::null)};
|
||||
auto const in10 = expect<std::list<int>>{"_\r\n", std::list<int>{}, "array.list", aedis::make_error_code(aedis::error::null)};
|
||||
|
||||
auto ex = ioc.get_executor();
|
||||
|
||||
@@ -427,6 +439,7 @@ BOOST_AUTO_TEST_CASE(test_array)
|
||||
test_sync(ex, in06);
|
||||
test_sync(ex, in07);
|
||||
test_sync(ex, in08);
|
||||
test_sync(ex, in09);
|
||||
|
||||
test_async(ex, in01);
|
||||
test_async(ex, in02);
|
||||
@@ -436,6 +449,7 @@ BOOST_AUTO_TEST_CASE(test_array)
|
||||
test_async(ex, in06);
|
||||
test_async(ex, in07);
|
||||
test_async(ex, in08);
|
||||
test_async(ex, in09);
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
@@ -449,7 +463,9 @@ BOOST_AUTO_TEST_CASE(test_set)
|
||||
using vec_type = std::vector<std::string>;
|
||||
using op_vec_type = boost::optional<std::vector<std::string>>;
|
||||
|
||||
std::string const wire2 = "*3\r\n$2\r\n11\r\n$2\r\n22\r\n$1\r\n3\r\n";
|
||||
std::string const wire = "~6\r\n+orange\r\n+apple\r\n+one\r\n+two\r\n+three\r\n+orange\r\n";
|
||||
|
||||
std::vector<node_type> const expected1a
|
||||
{ {resp3::type::set, 6UL, 0UL, {}}
|
||||
, {resp3::type::simple_string, 1UL, 1UL, {"orange"}}
|
||||
@@ -476,6 +492,7 @@ BOOST_AUTO_TEST_CASE(test_set)
|
||||
auto const in06 = expect<uset_type>{wire, e1c, "set.unordered_set"};
|
||||
auto const in07 = expect<muset_type>{wire, e1g, "set.unordered_multiset"};
|
||||
auto const in08 = expect<std::tuple<uset_type>>{"*1\r\n" + wire, std::tuple<uset_type>{e1c}, "set.tuple"};
|
||||
auto const in09 = expect<set_type>{wire2, set_type{}, "set.error", aedis::make_error_code(aedis::error::expects_resp3_set)};
|
||||
|
||||
auto ex = ioc.get_executor();
|
||||
|
||||
@@ -488,6 +505,7 @@ BOOST_AUTO_TEST_CASE(test_set)
|
||||
test_sync(ex, in06);
|
||||
test_sync(ex, in07);
|
||||
test_sync(ex, in08);
|
||||
test_sync(ex, in09);
|
||||
|
||||
test_async(ex, in00);
|
||||
test_async(ex, in01);
|
||||
@@ -498,6 +516,7 @@ BOOST_AUTO_TEST_CASE(test_set)
|
||||
test_async(ex, in06);
|
||||
test_async(ex, in07);
|
||||
test_async(ex, in08);
|
||||
test_async(ex, in09);
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
@@ -553,11 +572,11 @@ BOOST_AUTO_TEST_CASE(test_blob_string)
|
||||
BOOST_AUTO_TEST_CASE(test_double)
|
||||
{
|
||||
net::io_context ioc;
|
||||
// TODO: Add test for double.
|
||||
auto const in01 = expect<node_type>{",1.23\r\n", node_type{resp3::type::doublean, 1UL, 0UL, {"1.23"}}, "double.node"};
|
||||
auto const in02 = expect<node_type>{",inf\r\n", node_type{resp3::type::doublean, 1UL, 0UL, {"inf"}}, "double.node (inf)"};
|
||||
auto const in03 = expect<node_type>{",-inf\r\n", node_type{resp3::type::doublean, 1UL, 0UL, {"-inf"}}, "double.node (-inf)"};
|
||||
auto const in04 = expect<double>{",1.23\r\n", double{1.23}, "double.double"};
|
||||
auto const in05 = expect<double>{",er\r\n", double{0}, "double.double", aedis::make_error_code(aedis::error::not_a_double)};
|
||||
|
||||
auto ex = ioc.get_executor();
|
||||
|
||||
@@ -565,11 +584,13 @@ BOOST_AUTO_TEST_CASE(test_double)
|
||||
test_sync(ex, in02);
|
||||
test_sync(ex, in03);
|
||||
test_sync(ex, in04);
|
||||
test_sync(ex, in05);
|
||||
|
||||
test_async(ex, in01);
|
||||
test_async(ex, in02);
|
||||
test_async(ex, in03);
|
||||
test_async(ex, in04);
|
||||
test_async(ex, in05);
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user