2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Removes the sync wrapper.

This commit is contained in:
Marcelo Zimbres
2022-09-29 23:15:50 +02:00
parent 3a03a43c06
commit dff8833fe3
12 changed files with 31 additions and 567 deletions

View File

@@ -35,7 +35,7 @@ jobs:
uses: MarkusJx/install-boost@v2.3.0
id: install-boost
with:
boost_version: 1.79.0
boost_version: 1.80.0
platform_version: 22.04
- name: Run CMake
run: |

View File

@@ -30,7 +30,7 @@ jobs:
uses: MarkusJx/install-boost@v2.3.0
id: install-boost
with:
boost_version: 1.79.0
boost_version: 1.80.0
platform_version: 22.04
- name: Run CMake
run: |

View File

@@ -2,6 +2,8 @@
## master
* Removes `aedis::sync` class, see intro_sync.cpp for an alternative.
* Moves from `boost::optional` to `std::optional`. This is part of
moving to C++17.

View File

@@ -1,7 +1,7 @@
# At the moment the official build system is still autotools and this
# file is meant to support Aedis on windows.
# BOOST_ROOT=/opt/boost_1_79_0/ cmake -DCMAKE_CXX_FLAGS="-g -O0
# BOOST_ROOT=/opt/boost_1_80_0/ cmake -DCMAKE_CXX_FLAGS="-g -O0
# -std=c++20 -Wall -Wextra --coverage -fkeep-inline-functions
# -fkeep-static-functions" -DCMAKE_EXE_LINKER_FLAGS="--coverage"
# ~/my/aedis
@@ -42,7 +42,7 @@ write_basic_package_version_file(
COMPATIBILITY AnyNewerVersion
)
find_package(Boost 1.79 REQUIRED)
find_package(Boost 1.80 REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})
find_package(OpenSSL REQUIRED)
@@ -59,10 +59,8 @@ add_executable(echo_server examples/echo_server.cpp)
add_executable(intro examples/intro.cpp)
add_executable(intro_tls examples/intro_tls.cpp)
add_executable(intro_sync examples/intro_sync.cpp)
add_executable(intro_sync_tls examples/intro_sync_tls.cpp)
add_executable(serialization examples/serialization.cpp)
add_executable(subscriber examples/subscriber.cpp)
add_executable(subscriber_sync examples/subscriber_sync.cpp)
add_executable(subscriber_sentinel examples/subscriber_sentinel.cpp)
add_executable(test_low_level tests/low_level.cpp)
add_executable(low_level_sync tests/low_level_sync.cpp)
@@ -82,7 +80,6 @@ target_compile_features(test_connection_push PUBLIC cxx_std_20)
target_compile_features(test_connection_quit PUBLIC cxx_std_20)
target_link_libraries(intro_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(intro_sync_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(test_connection_tls OpenSSL::Crypto OpenSSL::SSL)
# Tests

View File

@@ -626,4 +626,5 @@ Some people that were helpful in the development of Aedis
* Vinícius dos Santos Oliveira ([vinipsmaker](https://github.com/vinipsmaker)): For useful discussion about how Aedis consumes buffers in the read operation (among other things).
* Petr Dannhofer ([Eddie-cz](https://github.com/Eddie-cz)): For helping me understand how the `AUTH` and `HELLO` command can influence each other.
* Mohammad Nejati ([ashtum](https://github.com/ashtum)): For pointing scenarios where calls to `async_exec` should fail when the connection is lost.
* Klemens Morgenstern ([klemens-morgenstern](https://github.com/klemens-morgenstern)): For useful discussion about timeouts, the synchronous interface and help with Asio.

View File

@@ -7,7 +7,7 @@
#include <tuple>
#include <string>
#include <thread>
#include <boost/asio/io_context.hpp>
#include <boost/asio.hpp>
#include <aedis.hpp>
// Include this in no more than one .cpp file.
@@ -17,34 +17,44 @@ namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using aedis::endpoint;
using connection = aedis::sync<aedis::connection<>>;
using connection = aedis::connection<>;
template <class Adapter>
auto exec(connection& conn, request const& req, Adapter adapter, boost::system::error_code& ec)
{
net::dispatch(
conn.get_executor(),
net::deferred([&]() { return conn.async_exec(req, adapter, net::deferred); }))
(net::redirect_error(net::use_future, ec)).get();
}
auto logger = [](auto const& ec)
{ std::clog << "Run: " << ec.message() << std::endl; };
int main()
{
try {
net::io_context ioc{1};
auto work = net::make_work_guard(ioc);
std::thread t1{[&]() { ioc.run(); }};
connection conn{work.get_executor()};
std::thread t2{[&]() {
boost::system::error_code ec;
endpoint ep{"127.0.0.1", "6379"};
conn.run(ep, ec);
connection conn{ioc};
std::thread t{[&]() {
conn.async_run({"127.0.0.1", "6379"}, logger);
ioc.run();
}};
request req;
req.push("PING");
req.push("QUIT");
boost::system::error_code ec;
std::tuple<std::string, aedis::ignore> resp;
conn.exec(req, adapt(resp));
std::cout << "Response: " << std::get<0>(resp) << std::endl;
exec(conn, req, adapt(resp), ec);
work.reset();
std::cout
<< "Exec: " << ec.message() << "\n"
<< "Response: " << std::get<0>(resp) << std::endl;
t1.join();
t2.join();
t.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}

View File

@@ -1,65 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <tuple>
#include <string>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <aedis.hpp>
#include <aedis/ssl/sync.hpp>
#include <aedis/ssl/connection.hpp>
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using aedis::endpoint;
using connection = aedis::ssl::sync<aedis::ssl::connection<net::ssl::stream<net::ip::tcp::socket>>>;
bool verify_certificate(bool, net::ssl::verify_context&)
{
std::cout << "set_verify_callback" << std::endl;
return true;
}
int main()
{
try {
net::io_context ioc{1};
net::ssl::context ctx{net::ssl::context::sslv23};
auto work = net::make_work_guard(ioc);
std::thread t1{[&]() { ioc.run(); }};
connection conn{work.get_executor(), ctx};
conn.next_layer().next_layer().set_verify_mode(net::ssl::verify_peer);
conn.next_layer().next_layer().set_verify_callback(verify_certificate);
std::thread t2{[&]() {
boost::system::error_code ec;
conn.run({"127.0.0.1", "6379"}, ec);
}};
request req;
req.push("PING");
req.push("QUIT");
std::tuple<std::string, aedis::ignore> resp;
conn.exec(req, adapt(resp));
std::cout << "Response: " << std::get<0>(resp) << std::endl;
work.reset();
t1.join();
t2.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}

View File

@@ -1,65 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <tuple>
#include <string>
#include <thread>
#include <boost/asio.hpp>
#include <aedis.hpp>
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::node;
using aedis::resp3::request;
using aedis::endpoint;
using connection = aedis::sync<aedis::connection<>>;
// See subscriber.cpp for more info about how to run this example.
// Subscribe again everytime there is a disconnection.
void reconnect(connection& conn)
{
request req;
req.push("SUBSCRIBE", "channel");
endpoint ep{"127.0.0.1", "6379"};
for (;;) {
boost::system::error_code ec;
conn.run(ep, req, adapt(), ec);
conn.reset_stream();
std::cout << ec.message() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
}
}
int main()
{
try {
net::io_context ioc{1};
auto work = net::make_work_guard(ioc);
connection conn{work.get_executor()};
std::thread t1{[&]() { ioc.run(); }};
std::thread t2{[&]() { reconnect(conn); }};
for (std::vector<node<std::string>> resp;;) {
conn.receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
t1.join();
t2.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}

View File

@@ -10,7 +10,6 @@
#include <aedis/error.hpp>
#include <aedis/adapt.hpp>
#include <aedis/connection.hpp>
#include <aedis/sync.hpp>
#include <aedis/resp3/request.hpp>
/** @defgroup any Reference

View File

@@ -1,76 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_SSL_SYNC_HPP
#define AEDIS_SSL_SYNC_HPP
#include <aedis/sync_base.hpp>
#include <aedis/ssl/connection.hpp>
#include <boost/asio/io_context.hpp>
namespace aedis::ssl {
template <class>
class sync;
/** @brief An SSL synchronous connection to Redis.
* @ingroup any
*
* This class provides a wrapper to the `Connection` types passed as
* template parameter that provides synchronous communication to
* Redis.
*/
template <class AsyncReadWriteStream>
class sync<connection<boost::asio::ssl::stream<AsyncReadWriteStream>>> :
public sync_base<
typename connection<boost::asio::ssl::stream<AsyncReadWriteStream>>::executor_type,
sync<connection<boost::asio::ssl::stream<AsyncReadWriteStream>>>> {
public:
/// Next layer type.
using next_layer_type = connection<boost::asio::ssl::stream<AsyncReadWriteStream>>;
/// The executor type of the underlysing connection.
using executor_type = typename next_layer_type::executor_type;
/// Config options from the underlying connection.
using config = typename next_layer_type::config;
/// Operation options from the underlying connection.
using operation = typename next_layer_type::operation;
auto get_executor() noexcept { return conn_.get_executor();}
/** @brief Constructor
*
* @param ex Executor
* @param ctx The context for ssl operations.
* @param cfg Config options.
*/
explicit sync(executor_type ex, boost::asio::ssl::context& ctx, config cfg = config{})
: conn_{ex, ctx, cfg}
{ }
/** @brief Constructor
*
* @param ioc The io_context.
* @param ctx The context for ssl operations.
* @param cfg Config options.
*/
explicit sync(boost::asio::io_context& ioc, boost::asio::ssl::context& ctx, config cfg = config{})
: sync(ioc, ctx, std::move(cfg))
{ }
auto& next_layer() noexcept { return conn_; }
auto const& next_layer() const noexcept { return conn_; }
private:
template <class, class> friend class sync_base;
next_layer_type conn_;
};
} // aedis
#endif // AEDIS_SSL_SYNC_HPP

View File

@@ -1,71 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_SYNC_HPP
#define AEDIS_SYNC_HPP
#include <aedis/sync_base.hpp>
namespace aedis {
/** @brief High level synchronous connection to Redis.
* @ingroup any
*
* This class provides a wrapper to the `Connection` types passed as
* template parameter that provides synchronous communication to
* Redis.
*/
template <class Connection>
class sync:
public sync_base<
typename Connection::executor_type,
sync<Connection>> {
public:
/// Next layer type.
using next_layer_type = Connection;
/// Config options from the underlying connection.
using config = typename next_layer_type::config;
/// Operation options from the underlying connection.
using operation = typename next_layer_type::operation;
/// The executor type of the underlying connection.
using executor_type = typename next_layer_type::executor_type;
/// Returns the executor used in the underlying connection.
auto get_executor() noexcept { return conn_.get_executor();}
/** @brief Constructor
*
* @param ex Executor
* @param cfg Config options.
*/
explicit sync(executor_type ex, config cfg = config{}) : conn_{ex, cfg} { }
/** @brief Constructor
*
* @param ioc The io_context.
* @param cfg Config options.
*/
explicit sync(boost::asio::io_context& ioc, config cfg = config{})
: sync(ioc, std::move(cfg))
{ }
/// Returns a reference to the next layer.
auto& next_layer() noexcept { return conn_; }
/// Returns a const reference to the next layer.
auto next_layer() const noexcept -> auto const& { return conn_; }
private:
template <class, class> friend class sync_base;
Connection conn_;
};
} // aedis
#endif // AEDIS_SYNC_HPP

View File

@@ -1,268 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_SYNC_BASE_HPP
#define AEDIS_SYNC_BASE_HPP
#include <condition_variable>
#include <aedis/resp3/request.hpp>
namespace aedis {
/** @brief A Synchronous connection to Redis.
* @ingroup any
*
* This class is not meant to be instantiated directly but as base
* class in the CRTP.
*
* @tparam Executor The executor type.
* @tparam Derived The derived class type.
*
*/
template <class Executor, class Derived>
class sync_base {
public:
/// The executor type.
using executor_type = Executor;
/** @brief Calls `connection::async_exec` from the underlying connection object and waits for its completion.
*
* @param req The request.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter>
auto
exec(resp3::request const& req, ResponseAdapter adapter, boost::system::error_code& ec)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, &req, adapter]()
{
derived().next_layer().async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(derived().get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Calls `connection::async_exec` from the underlying connection object and waits for its completion.
*
* @param req The request.
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter = detail::response_traits<void>::adapter_type>
auto exec(resp3::request const& req, ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = exec(req, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Calls `connection::async_receive_push` from the underlying connection object and waits for its completion.
*
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes received.
*/
template <class ResponseAdapter>
auto receive_push(ResponseAdapter adapter, boost::system::error_code& ec)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, adapter]()
{
derived().next_layer().async_receive_push(adapter, [&ec, &res, &sh](auto const& e, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = e;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(derived().get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Calls `connection::async_receive_push` from the underlying connection object and waits for its completion.
*
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes received.
*/
template <class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
auto receive_push(ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = receive_push(adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Calls `connection::async_run` from the underlying connection and waits for its completion.
*
* @param ep The Redis server endpoint.
* @param ec Error code.
*/
void run(endpoint ep, boost::system::error_code& ec)
{
sync_helper sh;
auto f = [this, ep, &ec, &sh]()
{
derived().next_layer().async_run(ep, [&ec, &sh](auto const& e) {
std::unique_lock ul(sh.mutex);
ec = e;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(derived().get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
}
/** @brief Calls `connection::async_run` from the underlying connection and waits for its completion.
*
* @param ep The Redis server endpoint.
* @throws std::system_error.
*/
void run(endpoint ep)
{
boost::system::error_code ec;
run(std::move(ep), ec);
if (ec)
throw std::system_error(ec);
}
/** @brief Calls `connection::async_run` from the underlying connection and waits for its completion.
*
* @param ep The Redis server endpoint.
* @param req The request.
* @param adapter The response adapter.
* @param ec Error code in case of error.
*/
template <class ResponseAdapter>
auto run(endpoint& ep, resp3::request const& req, ResponseAdapter adapter, boost::system::error_code& ec)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, ep, &ec, &res, &sh, &req, adapter]() mutable
{
derived().next_layer().async_run(ep, req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(derived().get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Calls `connection::async_run` from the underlying connection and waits for its completion.
*
* @param ep The Redis server endpoint.
* @param req The request.
* @param adapter The response adapter.
* @throws std::system_error.
*/
template <class ResponseAdapter = detail::response_traits<void>::adapter_type>
auto run(endpoint& ep, resp3::request const& req, ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = run(ep, req, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Calls `connection::cancel` in the underlying connection object.
*
* @param op The operation to cancel.
* @returns The number of operations canceled.
*/
template <class Operation>
auto cancel(Operation op)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, op, &res, &sh]()
{
std::unique_lock ul(sh.mutex);
res = derived().next_layer().cancel(op);
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
};
boost::asio::dispatch(boost::asio::bind_executor(derived().get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/// @brief Calls `connection::reset` in the underlying connection object.
void reset_stream()
{
sync_helper sh;
auto f = [this, &sh]()
{
std::unique_lock ul(sh.mutex);
derived().reset_stream();
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
};
boost::asio::dispatch(boost::asio::bind_executor(derived().get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
}
private:
Derived& derived() { return static_cast<Derived&>(*this); }
struct sync_helper {
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
};
};
} // aedis
#endif // AEDIS_SYNC_BASE_HPP