2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Unifies ssl and plain connections.

This commit is contained in:
Marcelo Zimbres
2023-05-07 15:40:13 +02:00
parent 30a6e34e4e
commit 6f9fd5b2fb
37 changed files with 251 additions and 425 deletions

View File

@@ -84,7 +84,7 @@ endif()
#=======================================================================
add_executable(cpp20_intro examples/cpp20_intro.cpp)
target_link_libraries(cpp20_intro common)
target_link_libraries(cpp20_intro PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
target_compile_features(cpp20_intro PUBLIC cxx_std_20)
add_test(cpp20_intro cpp20_intro)
if (MSVC)
@@ -93,7 +93,7 @@ if (MSVC)
endif()
add_executable(cpp20_streams examples/cpp20_streams.cpp)
target_link_libraries(cpp20_streams common)
target_link_libraries(cpp20_streams PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
target_compile_features(cpp20_streams PUBLIC cxx_std_20)
if (MSVC)
target_compile_options(cpp20_streams PRIVATE /bigobj)
@@ -101,6 +101,7 @@ if (MSVC)
endif()
add_executable(cpp17_intro examples/cpp17_intro.cpp)
target_link_libraries(cpp17_intro PRIVATE OpenSSL::Crypto OpenSSL::SSL)
target_compile_features(cpp17_intro PUBLIC cxx_std_17)
add_test(cpp17_intro cpp17_intro)
if (MSVC)
@@ -111,18 +112,19 @@ endif()
if (NOT MSVC)
add_executable(cpp17_intro_sync examples/cpp17_intro_sync.cpp)
target_compile_features(cpp17_intro_sync PUBLIC cxx_std_17)
target_link_libraries(cpp17_intro_sync PRIVATE OpenSSL::Crypto OpenSSL::SSL)
add_test(cpp17_intro_sync cpp17_intro_sync)
endif()
if (NOT MSVC)
add_executable(cpp20_chat_room examples/cpp20_chat_room.cpp)
target_compile_features(cpp20_chat_room PUBLIC cxx_std_20)
target_link_libraries(cpp20_chat_room common)
target_link_libraries(cpp20_chat_room PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
endif()
add_executable(cpp20_containers examples/cpp20_containers.cpp)
target_compile_features(cpp20_containers PUBLIC cxx_std_20)
target_link_libraries(cpp20_containers common)
target_link_libraries(cpp20_containers PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
add_test(cpp20_containers cpp20_containers)
if (MSVC)
target_compile_options(cpp20_containers PRIVATE /bigobj)
@@ -132,12 +134,12 @@ endif()
if (NOT MSVC)
add_executable(cpp20_echo_server examples/cpp20_echo_server.cpp)
target_compile_features(cpp20_echo_server PUBLIC cxx_std_20)
target_link_libraries(cpp20_echo_server common)
target_link_libraries(cpp20_echo_server PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
endif()
add_executable(cpp20_resolve_with_sentinel examples/cpp20_resolve_with_sentinel.cpp)
target_compile_features(cpp20_resolve_with_sentinel PUBLIC cxx_std_20)
target_link_libraries(cpp20_resolve_with_sentinel common)
target_link_libraries(cpp20_resolve_with_sentinel PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
#add_test(cpp20_resolve_with_sentinel cpp20_resolve_with_sentinel)
if (MSVC)
target_compile_options(cpp20_resolve_with_sentinel PRIVATE /bigobj)
@@ -146,7 +148,7 @@ endif()
add_executable(cpp20_json examples/cpp20_json.cpp)
target_compile_features(cpp20_json PUBLIC cxx_std_20)
target_link_libraries(cpp20_json common)
target_link_libraries(cpp20_json PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
add_test(cpp20_json cpp20_json)
if (MSVC)
target_compile_options(cpp20_json PRIVATE /bigobj)
@@ -157,7 +159,7 @@ if (Protobuf_FOUND)
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS examples/person.proto)
add_executable(cpp20_protobuf examples/cpp20_protobuf.cpp ${PROTO_SRCS} ${PROTO_HDRS})
target_compile_features(cpp20_protobuf PUBLIC cxx_std_20)
target_link_libraries(cpp20_protobuf common ${Protobuf_LIBRARIES})
target_link_libraries(cpp20_protobuf PRIVATE OpenSSL::Crypto OpenSSL::SSL common ${Protobuf_LIBRARIES})
target_include_directories(cpp20_protobuf PUBLIC ${Protobuf_INCLUDE_DIRS} ${CMAKE_CURRENT_BINARY_DIR})
add_test(cpp20_protobuf cpp20_protobuf)
if (MSVC)
@@ -168,7 +170,7 @@ endif()
add_executable(cpp20_subscriber examples/cpp20_subscriber.cpp)
target_compile_features(cpp20_subscriber PUBLIC cxx_std_20)
target_link_libraries(cpp20_subscriber common)
target_link_libraries(cpp20_subscriber PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
if (MSVC)
target_compile_options(cpp20_subscriber PRIVATE /bigobj)
target_compile_definitions(cpp20_subscriber PRIVATE _WIN32_WINNT=0x0601)
@@ -177,8 +179,7 @@ endif()
add_executable(cpp20_intro_tls examples/cpp20_intro_tls.cpp)
target_compile_features(cpp20_intro_tls PUBLIC cxx_std_20)
add_test(cpp20_intro_tls cpp20_intro_tls)
target_link_libraries(cpp20_intro_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(cpp20_intro_tls common)
target_link_libraries(cpp20_intro_tls PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
if (MSVC)
target_compile_options(cpp20_intro_tls PRIVATE /bigobj)
target_compile_definitions(cpp20_intro_tls PRIVATE _WIN32_WINNT=0x0601)
@@ -187,7 +188,7 @@ endif()
add_executable(cpp20_low_level_async tests/cpp20_low_level_async.cpp)
target_compile_features(cpp20_low_level_async PUBLIC cxx_std_20)
add_test(cpp20_low_level_async cpp20_low_level_async)
target_link_libraries(cpp20_low_level_async common)
target_link_libraries(cpp20_low_level_async PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
if (MSVC)
target_compile_options(cpp20_low_level_async PRIVATE /bigobj)
target_compile_definitions(cpp20_low_level_async PRIVATE _WIN32_WINNT=0x0601)
@@ -217,7 +218,7 @@ endif()
add_executable(test_conn_exec tests/conn_exec.cpp)
target_compile_features(test_conn_exec PUBLIC cxx_std_20)
target_link_libraries(test_conn_exec test_common)
target_link_libraries(test_conn_exec PRIVATE OpenSSL::Crypto OpenSSL::SSL test_common)
add_test(test_conn_exec test_conn_exec)
if (MSVC)
target_compile_options(test_conn_exec PRIVATE /bigobj)
@@ -226,7 +227,7 @@ endif()
add_executable(test_conn_exec_retry tests/conn_exec_retry.cpp)
target_compile_features(test_conn_exec_retry PUBLIC cxx_std_20)
target_link_libraries(test_conn_exec_retry test_common)
target_link_libraries(test_conn_exec_retry PRIVATE OpenSSL::Crypto OpenSSL::SSL test_common)
add_test(test_conn_exec_retry test_conn_exec_retry)
if (MSVC)
target_compile_options(test_conn_exec_retry PRIVATE /bigobj)
@@ -235,7 +236,7 @@ endif()
add_executable(test_conn_push tests/conn_push.cpp)
target_compile_features(test_conn_push PUBLIC cxx_std_20)
target_link_libraries(test_conn_push test_common)
target_link_libraries(test_conn_push PRIVATE OpenSSL::Crypto OpenSSL::SSL test_common)
add_test(test_conn_push test_conn_push)
if (MSVC)
target_compile_options(test_conn_push PRIVATE /bigobj)
@@ -244,6 +245,7 @@ endif()
add_executable(test_conn_quit tests/conn_quit.cpp)
target_compile_features(test_conn_quit PUBLIC cxx_std_17)
target_link_libraries(test_conn_quit PRIVATE OpenSSL::Crypto OpenSSL::SSL test_common)
add_test(test_conn_quit test_conn_quit)
if (MSVC)
target_compile_options(test_conn_quit PRIVATE /bigobj)
@@ -252,7 +254,7 @@ endif()
add_executable(test_conn_reconnect tests/conn_reconnect.cpp)
target_compile_features(test_conn_reconnect PUBLIC cxx_std_20)
target_link_libraries(test_conn_reconnect common test_common)
target_link_libraries(test_conn_reconnect PRIVATE OpenSSL::Crypto OpenSSL::SSL common test_common)
add_test(test_conn_reconnect test_conn_reconnect)
if (MSVC)
target_compile_options(test_conn_reconnect PRIVATE /bigobj)
@@ -262,7 +264,7 @@ endif()
add_executable(test_conn_tls tests/conn_tls.cpp)
add_test(test_conn_tls test_conn_tls)
target_compile_features(test_conn_tls PUBLIC cxx_std_17)
target_link_libraries(test_conn_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(test_conn_tls PRIVATE OpenSSL::Crypto OpenSSL::SSL)
if (MSVC)
target_compile_options(test_conn_tls PRIVATE /bigobj)
target_compile_definitions(test_conn_tls PRIVATE _WIN32_WINNT=0x0601)
@@ -279,6 +281,7 @@ endif()
add_executable(test_conn_run_cancel tests/conn_run_cancel.cpp)
target_compile_features(test_conn_run_cancel PUBLIC cxx_std_20)
add_test(test_conn_run_cancel test_conn_run_cancel)
target_link_libraries(test_conn_run_cancel PRIVATE OpenSSL::Crypto OpenSSL::SSL)
if (MSVC)
target_compile_options(test_conn_run_cancel PRIVATE /bigobj)
target_compile_definitions(test_conn_run_cancel PRIVATE _WIN32_WINNT=0x0601)
@@ -286,7 +289,7 @@ endif()
add_executable(test_conn_exec_cancel tests/conn_exec_cancel.cpp)
target_compile_features(test_conn_exec_cancel PUBLIC cxx_std_20)
target_link_libraries(test_conn_exec_cancel common test_common)
target_link_libraries(test_conn_exec_cancel PRIVATE OpenSSL::Crypto OpenSSL::SSL common test_common)
add_test(test_conn_exec_cancel test_conn_exec_cancel)
if (MSVC)
target_compile_options(test_conn_exec_cancel PRIVATE /bigobj)
@@ -295,7 +298,7 @@ endif()
add_executable(test_conn_exec_cancel2 tests/conn_exec_cancel2.cpp)
target_compile_features(test_conn_exec_cancel2 PUBLIC cxx_std_20)
target_link_libraries(test_conn_exec_cancel2 common test_common)
target_link_libraries(test_conn_exec_cancel2 PRIVATE OpenSSL::Crypto OpenSSL::SSL common test_common)
add_test(test_conn_exec_cancel2 test_conn_exec_cancel2)
if (MSVC)
target_compile_options(test_conn_exec_cancel2 PRIVATE /bigobj)
@@ -304,7 +307,7 @@ endif()
add_executable(test_conn_exec_error tests/conn_exec_error.cpp)
target_compile_features(test_conn_exec_error PUBLIC cxx_std_17)
target_link_libraries(test_conn_exec_error common test_common)
target_link_libraries(test_conn_exec_error PRIVATE OpenSSL::Crypto OpenSSL::SSL common test_common)
add_test(test_conn_exec_error test_conn_exec_error)
if (MSVC)
target_compile_options(test_conn_exec_error PRIVATE /bigobj)
@@ -313,7 +316,7 @@ endif()
add_executable(test_conn_echo_stress tests/conn_echo_stress.cpp)
target_compile_features(test_conn_echo_stress PUBLIC cxx_std_20)
target_link_libraries(test_conn_echo_stress common test_common)
target_link_libraries(test_conn_echo_stress PRIVATE OpenSSL::Crypto OpenSSL::SSL common test_common)
add_test(test_conn_echo_stress test_conn_echo_stress)
if (MSVC)
target_compile_options(test_conn_echo_stress PRIVATE /bigobj)
@@ -330,7 +333,7 @@ endif()
add_executable(test_issue_50 tests/issue_50.cpp)
target_compile_features(test_issue_50 PUBLIC cxx_std_20)
target_link_libraries(test_issue_50 common)
target_link_libraries(test_issue_50 PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
add_test(test_issue_50 test_issue_50)
if (MSVC)
target_compile_options(test_issue_50 PRIVATE /bigobj)
@@ -339,7 +342,7 @@ endif()
add_executable(test_conn_check_health tests/conn_check_health.cpp)
target_compile_features(test_conn_check_health PUBLIC cxx_std_17)
target_link_libraries(test_conn_check_health common)
target_link_libraries(test_conn_check_health PRIVATE OpenSSL::Crypto OpenSSL::SSL common)
add_test(test_conn_check_health test_conn_check_health)
if (MSVC)
target_compile_options(test_conn_check_health PRIVATE /bigobj)
@@ -348,7 +351,7 @@ endif()
add_executable(test_run tests/run.cpp)
target_compile_features(test_run PUBLIC cxx_std_17)
target_link_libraries(test_run test_common)
target_link_libraries(test_run PRIVATE OpenSSL::Crypto OpenSSL::SSL test_common)
add_test(test_run test_run)
if (MSVC)
target_compile_options(test_run PRIVATE /bigobj)

View File

@@ -678,7 +678,7 @@ Acknowledgement to people that helped shape Boost.Redis
* Mohammad Nejati ([ashtum](https://github.com/ashtum)): For pointing out scenarios where calls to `async_exec` should fail when the connection is lost.
* Klemens Morgenstern ([klemens-morgenstern](https://github.com/klemens-morgenstern)): For useful discussion about timeouts, cancellation, synchronous interfaces and general help with Asio.
* Vinnie Falco ([vinniefalco](https://github.com/vinniefalco)): For general suggestions about how to improve the code and the documentation.
* Bram Veldhoen ([bveldhoen](https://github.com/bveldhoen)): For contributing a Redis streams example.
* Bram Veldhoen ([bveldhoen](https://github.com/bveldhoen)): For contributing a Redis-streams example.
Also many thanks to all individuals that participated in the Boost
review

View File

@@ -32,7 +32,8 @@ auto main(int argc, char * argv[]) -> int
response<std::string> resp;
net::io_context ioc;
connection conn{ioc};
net::ssl::context ctx{net::ssl::context::tls_client};
connection conn{ioc, ctx};
conn.async_run(cfg, {}, net::detached);

View File

@@ -12,6 +12,7 @@
// Include this in no more than one .cpp file.
#include <boost/redis/src.hpp>
namespace net = boost::asio;
using boost::redis::sync_connection;
using boost::redis::request;
using boost::redis::response;
@@ -27,7 +28,8 @@ auto main(int argc, char * argv[]) -> int
cfg.addr.port = argv[2];
}
sync_connection conn;
net::ssl::context ctx{net::ssl::context::tls_client};
sync_connection conn{ctx};
conn.run(cfg);
request req;

View File

@@ -38,7 +38,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");
while (!conn->is_cancelled()) {
while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req);
@@ -75,12 +75,13 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto co_main(config cfg) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(ex, *ctx);
auto stream = std::make_shared<stream_descriptor>(ex, ::dup(STDIN_FILENO));
net::co_spawn(ex, receiver(conn), net::detached);
net::co_spawn(ex, publisher(stream, conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
signal_set sig_set{ex, SIGINT, SIGTERM};
co_await sig_set.async_wait();

View File

@@ -91,8 +91,9 @@ auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
// Called from the main function (see main.cpp)
net::awaitable<void> co_main(config cfg)
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(co_await net::this_coro::executor, *ctx);
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
co_await store(conn);
co_await transaction(conn);

View File

@@ -58,9 +58,10 @@ auto listener(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto co_main(config cfg) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(ex, *ctx);
net::co_spawn(ex, listener(conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
signal_set sig_set(ex, SIGINT, SIGTERM);
co_await sig_set.async_wait();

View File

@@ -23,8 +23,9 @@ using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(co_await net::this_coro::executor, *ctx);
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
// A request containing only a ping command.
request req;

View File

@@ -4,7 +4,7 @@
* accompanying file LICENSE.txt)
*/
#include <boost/redis/ssl/connection.hpp>
#include <boost/redis/connection.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>
@@ -18,7 +18,7 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using connection = net::deferred_t::as_default_on_t<boost::redis::ssl::connection>;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
auto verify_certificate(bool, net::ssl::verify_context&) -> bool
{
@@ -28,14 +28,15 @@ auto verify_certificate(bool, net::ssl::verify_context&) -> bool
auto co_main(config cfg) -> net::awaitable<void>
{
cfg.use_ssl = true;
cfg.username = "aedis";
cfg.password = "aedis";
cfg.addr.host = "db.occase.de";
cfg.addr.port = "6380";
net::ssl::context ctx{net::ssl::context::sslv23};
auto conn = std::make_shared<connection>(co_await net::this_coro::executor, ctx);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(co_await net::this_coro::executor, *ctx);
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
request req;
req.push("PING");

View File

@@ -48,8 +48,9 @@ void boost_redis_from_bulk(user& u, std::string_view sv, boost::system::error_co
auto co_main(config cfg) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(ex, *ctx);
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
// user object that will be stored in Redis in json format.
user const u{"Joao", "58", "Brazil"};

View File

@@ -47,8 +47,9 @@ using tutorial::boost_redis_from_bulk;
net::awaitable<void> co_main(config cfg)
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(ex, *ctx);
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
person p;
p.set_name("Louis");

View File

@@ -34,7 +34,8 @@ auto resolve_master_address(std::vector<address> const& addresses) -> net::await
req.push("SENTINEL", "get-master-addr-by-name", "mymaster");
req.push("QUIT");
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(co_await net::this_coro::executor, *ctx);
response<std::optional<std::array<std::string, 2>>, ignore_t> resp;
for (auto addr : addresses) {
@@ -44,7 +45,7 @@ auto resolve_master_address(std::vector<address> const& addresses) -> net::await
// TODO: async_run and async_exec should be lauched in
// parallel here so we can wait for async_run completion
// before eventually calling it again.
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
co_await conn->async_exec(req, resp, redir(ec));
conn->cancel();
conn->reset_stream();

View File

@@ -84,12 +84,13 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto co_main(config cfg) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(ex, *ctx);
net::co_spawn(ex, stream_reader(conn), net::detached);
// Disable health checks.
cfg.health_check_interval = std::chrono::seconds{0};
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
signal_set sig_set(ex, SIGINT, SIGTERM);
co_await sig_set.async_wait();

View File

@@ -51,7 +51,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");
while (!conn->is_cancelled()) {
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req);
@@ -75,9 +75,10 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto co_main(config cfg) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(ex, *ctx);
net::co_spawn(ex, receiver(conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
signal_set sig_set(ex, SIGINT, SIGTERM);
co_await sig_set.async_wait();

View File

@@ -20,9 +20,9 @@ namespace boost::redis
class sync_connection {
public:
sync_connection()
sync_connection(boost::asio::ssl::context& ctx)
: ioc_{1}
, conn_{std::make_shared<connection>(ioc_)}
, conn_{std::make_shared<connection>(ioc_, ctx)}
{ }
~sync_connection()

View File

@@ -27,6 +27,9 @@ struct address {
* @ingroup high-level-api
*/
struct config {
/// Uses SSL instead of a plain connection.
bool use_ssl = false;
/// Address of the Redis server.
address addr = address{"127.0.0.1", "6379"};
@@ -60,10 +63,16 @@ struct config {
/// Time the SSL handshake operation is allowed to last.
std::chrono::steady_clock::duration ssl_handshake_timeout = std::chrono::seconds{10};
/// @brief Health checks interval.
/** Health checks interval.
*
* To disable health-checks pass zero as duration.
*/
std::chrono::steady_clock::duration health_check_interval = std::chrono::seconds{2};
/// Time waited before trying a reconnection.
/** @brief Time waited before trying a reconnection.
*
* To disable reconnection pass zero as duration.
*/
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
};

View File

@@ -9,101 +9,79 @@
#include <boost/redis/detail/connection_base.hpp>
#include <boost/redis/detail/runner.hpp>
#include <boost/redis/detail/handshaker.hpp>
#include <boost/redis/detail/reconnection.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/basic_stream_socket.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <chrono>
#include <memory>
namespace boost::redis {
namespace detail
{
template <class Executor>
class dummy_handshaker {
public:
dummy_handshaker(Executor) {}
template <class Stream, class CompletionToken>
auto async_handshake(Stream&, CompletionToken&& token)
{ return asio::post(std::move(token)); }
void set_config(config const&) {}
std::size_t cancel(operation) { return 0;}
constexpr bool is_dummy() const noexcept {return true;}
};
}
/** @brief A connection to the Redis server.
* @ingroup high-level-api
/** \brief A SSL connection to the Redis server.
* \ingroup high-level-api
*
* For more details, please see the documentation of each individual
* function.
* This class keeps a healthy connection to the Redis instance where
* commands can be sent at any time. For more details, please see the
* documentation of each individual function.
*
* @tparam Socket The socket type e.g. asio::ip::tcp::socket.
*
*/
template <class Socket>
class basic_connection :
private detail::connection_base<
typename Socket::executor_type,
basic_connection<Socket>> {
template <class Executor>
class basic_connection : private detail::connection_base<Executor, basic_connection<Executor>> {
public:
/// Executor type.
using executor_type = typename Socket::executor_type;
/// Type of the next layer
using next_layer_type = Socket;
using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
/// Executor type.
using executor_type = Executor;
/// Rebinds the socket type to another executor.
template <class Executor1>
struct rebind_executor
{
/// The socket type when rebound to the specified executor.
using other = basic_connection<typename next_layer_type::template rebind_executor<Executor1>::other>;
/// The connection type when rebound to the specified executor.
using other = basic_connection<Executor1>;
};
using base_type = detail::connection_base<executor_type, basic_connection<Socket>>;
using base_type = redis::detail::connection_base<Executor, basic_connection<Executor>>;
/// Contructs from an executor.
explicit
basic_connection(executor_type ex)
basic_connection(executor_type ex, asio::ssl::context& ctx)
: base_type{ex}
, ctx_{&ctx}
, reconn_{ex}
, runner_{ex, {}}
, stream_{ex}
{}
, stream_{std::make_unique<next_layer_type>(ex, ctx)}
{ }
/// Contructs from a context.
explicit
basic_connection(asio::io_context& ioc)
: basic_connection(ioc.get_executor())
basic_connection(asio::io_context& ioc, asio::ssl::context& ctx)
: basic_connection(ioc.get_executor(), ctx)
{ }
/// Returns the associated executor.
auto get_executor() {return stream_.get_executor();}
auto get_executor() {return stream_->get_executor();}
/// Resets the underlying stream.
/// Reset the underlying stream.
void reset_stream()
{
if (stream_.is_open()) {
system::error_code ec;
stream_.shutdown(asio::ip::tcp::socket::shutdown_both, ec);
stream_.close(ec);
}
stream_ = std::make_unique<next_layer_type>(stream_->get_executor(), *ctx_);
}
/// Returns a reference to the next layer.
auto next_layer() noexcept -> auto& { return stream_; }
auto& next_layer() noexcept { return *stream_; }
/// Returns a const reference to the next layer.
auto next_layer() const noexcept -> auto const& { return stream_; }
auto const& next_layer() const noexcept { return *stream_; }
/** @brief Starts underlying connection operations.
*
@@ -153,7 +131,8 @@ public:
Logger l = Logger{},
CompletionToken token = CompletionToken{})
{
reconn_.set_wait_interval(cfg.reconnect_wait_interval);
use_ssl_ = cfg.use_ssl;
reconn_.set_config(cfg.reconnect_wait_interval);
runner_.set_config(cfg);
l.set_prefix(runner_.get_config().log_prefix);
return reconn_.async_run(*this, l, std::move(token));
@@ -262,11 +241,11 @@ public:
{ base_type::reserve(read, write); }
/// Returns true if the connection was canceled.
bool is_cancelled() const noexcept
{ return reconn_.is_cancelled();}
bool will_reconnect() const noexcept
{ return reconn_.will_reconnect();}
private:
using runner_type = detail::runner<executor_type, detail::dummy_handshaker>;
using runner_type = detail::runner<executor_type, detail::handshaker>;
using reconnection_type = detail::basic_reconnection<executor_type>;
using this_type = basic_connection<next_layer_type>;
@@ -290,20 +269,27 @@ private:
{ return base_type::async_run_impl(l, std::move(token)); }
void close()
{ reset_stream(); }
{
if (stream_->next_layer().is_open())
stream_->next_layer().close();
}
auto is_open() const noexcept { return stream_.is_open(); }
auto lowest_layer() noexcept -> auto& { return stream_.lowest_layer(); }
auto is_open() const noexcept { return stream_->next_layer().is_open(); }
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
auto use_ssl() const noexcept { return use_ssl_;}
bool use_ssl_ = false;
asio::ssl::context* ctx_;
reconnection_type reconn_;
runner_type runner_;
Socket stream_;
std::unique_ptr<next_layer_type> stream_;
};
/** \brief A connection that uses a asio::ip::tcp::socket.
/** \brief A connection that uses the asio::any_io_executor.
* \ingroup high-level-api
*/
using connection = basic_connection<asio::ip::tcp::socket>;
using connection = basic_connection<asio::any_io_executor>;
} // boost::redis

View File

@@ -83,6 +83,11 @@ public:
, cmds_{info->get_number_of_commands()}
{}
auto make_adapter() noexcept
{
return [i = index_, adpt = adapter_] (resp3::basic_node<std::string_view> const& nd, system::error_code& ec) mutable { adpt(i, nd, ec); };
}
template <class Self>
void
operator()( Self& self
@@ -103,11 +108,12 @@ public:
// to hand it to the push consumer. To do that we need
// some data in the read bufer.
if (conn_->read_buffer_.empty()) {
BOOST_ASIO_CORO_YIELD
asio::async_read_until(
conn_->next_layer(),
conn_->make_dynamic_buffer(),
"\r\n", std::move(self));
if (conn_->derived().use_ssl())
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn_->next_layer(), conn_->make_dynamic_buffer(), "\r\n", std::move(self));
else
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), "\r\n", std::move(self));
BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
if (info_->stop_requested()) {
self.complete(asio::error::operation_aborted, 0);
@@ -125,12 +131,10 @@ public:
}
//-----------------------------------
BOOST_ASIO_CORO_YIELD
redis::detail::async_read(
conn_->next_layer(),
conn_->make_dynamic_buffer(),
[i = index_, adpt = adapter_] (resp3::basic_node<std::string_view> const& nd, system::error_code& ec) mutable { adpt(i, nd, ec); },
std::move(self));
if (conn_->derived().use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer(), conn_->make_dynamic_buffer(), make_adapter(), std::move(self));
else
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn_->next_layer().next_layer(), conn_->make_dynamic_buffer(), make_adapter(), std::move(self));
++index_;
@@ -166,8 +170,11 @@ struct receive_op {
conn->channel_.async_receive(std::move(self));
BOOST_REDIS_CHECK_OP1(;);
BOOST_ASIO_CORO_YIELD
redis::detail::async_read(conn->next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
if (conn->derived().use_ssl())
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
else
BOOST_ASIO_CORO_YIELD redis::detail::async_read(conn->next_layer().next_layer(), conn->make_dynamic_buffer(), adapter, std::move(self));
if (ec || is_cancelled(self)) {
conn->cancel(operation::run);
conn->cancel(operation::receive);
@@ -342,8 +349,11 @@ struct writer_op {
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
while (conn_->coalesce_requests()) {
BOOST_ASIO_CORO_YIELD
asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
if (conn_->derived().use_ssl())
BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
else
BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
logger_.on_write(ec, conn_->write_buffer_);
BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run););
@@ -394,11 +404,10 @@ struct reader_op {
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
BOOST_ASIO_CORO_YIELD
asio::async_read_until(
conn->next_layer(),
conn->make_dynamic_buffer(),
"\r\n", std::move(self));
if (conn->derived().use_ssl())
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer(), conn->make_dynamic_buffer(), "\r\n", std::move(self));
else
BOOST_ASIO_CORO_YIELD asio::async_read_until(conn->next_layer().next_layer(), conn->make_dynamic_buffer(), "\r\n", std::move(self));
if (ec == asio::error::eof) {
conn->cancel(operation::run);

View File

@@ -19,7 +19,7 @@
#include <string>
#include <chrono>
namespace boost::redis::ssl::detail
namespace boost::redis::detail
{
template <class Handshaker, class Stream>
@@ -119,6 +119,6 @@ private:
std::chrono::steady_clock::duration timeout_;
};
} // boost::redis::ssl::detail
} // boost::redis::detail
#endif // BOOST_REDIS_SSL_CONNECTOR_HPP

View File

@@ -107,7 +107,7 @@ public:
{
BOOST_ASIO_CORO_REENTER (coro_)
{
if (checker_->ping_interval_.count() == 0) {
if (checker_->ping_interval_ == std::chrono::seconds::zero()) {
BOOST_ASIO_CORO_YIELD
asio::post(std::move(self));
self.complete({});

View File

@@ -30,10 +30,9 @@ struct reconnection_op {
{
BOOST_ASIO_CORO_YIELD
conn_->async_run_one(logger_, std::move(self));
conn_->reset_stream();
conn_->cancel(operation::receive);
logger_.on_connection_lost(ec);
if (reconn_->is_cancelled() || is_cancelled(self)) {
if (!reconn_->will_reconnect() || is_cancelled(self)) {
reconn_->cancel(operation::reconnection);
self.complete(!!ec ? ec : asio::error::operation_aborted);
return;
@@ -43,10 +42,11 @@ struct reconnection_op {
BOOST_ASIO_CORO_YIELD
reconn_->timer_.async_wait(std::move(self));
BOOST_REDIS_CHECK_OP0(;)
if (reconn_->is_cancelled()) {
if (!reconn_->will_reconnect()) {
self.complete(asio::error::operation_aborted);
return;
}
conn_->reset_stream();
}
}
};
@@ -56,19 +56,16 @@ struct reconnection_op {
template <class Executor>
class basic_reconnection {
public:
/// Executor type.
using executor_type = Executor;
basic_reconnection(Executor ex)
: timer_{ex}
, is_cancelled_{false}
{}
basic_reconnection(asio::io_context& ioc, std::chrono::steady_clock::duration wait_interval)
: basic_reconnection{ioc.get_executor(), wait_interval}
{}
/// Rebinds to a new executor type.
template <class Executor1>
struct rebind_executor
{
@@ -92,7 +89,7 @@ public:
>(detail::reconnection_op<basic_reconnection, Connection, Logger>{this, &conn, l}, token, conn);
}
void set_wait_interval(std::chrono::steady_clock::duration wait_interval)
void set_config(std::chrono::steady_clock::duration wait_interval)
{
wait_interval_ = wait_interval;
}
@@ -102,7 +99,7 @@ public:
switch (op) {
case operation::reconnection:
case operation::all:
is_cancelled_ = true;
wait_interval_ = std::chrono::seconds::zero();
timer_.cancel();
break;
default: /* ignore */;
@@ -111,8 +108,8 @@ public:
return 0U;
}
bool is_cancelled() const noexcept {return is_cancelled_;}
void reset() noexcept {is_cancelled_ = false;}
bool will_reconnect() const noexcept
{ return wait_interval_ != std::chrono::seconds::zero();}
private:
using timer_type =
@@ -125,11 +122,8 @@ private:
timer_type timer_;
std::chrono::steady_clock::duration wait_interval_ = std::chrono::seconds{1};
bool is_cancelled_;
};
using reconnection = basic_reconnection<asio::any_io_executor>;
} // boost::redis
#endif // BOOST_REDIS_RECONNECTION_HPP

View File

@@ -136,7 +136,7 @@ struct run_all_op {
logger_.on_connect(ec, runner_->ctor_.endpoint());
BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
if (!runner_->hsher_.is_dummy()) {
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
logger_.on_ssl_handshake(ec);

View File

@@ -1,215 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_SSL_CONNECTION_HPP
#define BOOST_REDIS_SSL_CONNECTION_HPP
#include <boost/redis/detail/connection_base.hpp>
#include <boost/redis/detail/runner.hpp>
#include <boost/redis/ssl/detail/handshaker.hpp>
#include <boost/redis/detail/reconnection.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/logger.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <chrono>
#include <memory>
namespace boost::redis::ssl {
template <class>
class basic_connection;
/** \brief A SSL connection to the Redis server.
* \ingroup high-level-api
*
* This class keeps a healthy connection to the Redis instance where
* commands can be sent at any time. For more details, please see the
* documentation of each individual function.
*
* @tparam Socket The socket type e.g. asio::ip::tcp::socket.
*
*/
template <class Socket>
class basic_connection<asio::ssl::stream<Socket>> :
private redis::detail::connection_base<
typename asio::ssl::stream<Socket>::executor_type,
basic_connection<asio::ssl::stream<Socket>>> {
public:
/// Type of the next layer
using next_layer_type = asio::ssl::stream<Socket>;
/// Executor type.
using executor_type = typename next_layer_type::executor_type;
/// Rebinds the socket type to another executor.
template <class Executor1>
struct rebind_executor
{
/// The socket type when rebound to the specified executor.
using other = basic_connection<asio::ssl::stream<typename Socket::template rebind_executor<Executor1>::other>>;
};
using base_type = redis::detail::connection_base<executor_type, basic_connection<asio::ssl::stream<Socket>>>;
/// Constructor
explicit
basic_connection(executor_type ex, asio::ssl::context& ctx)
: base_type{ex}
, ctx_{&ctx}
, reconn_{ex}
, runner_{ex, {}}
, stream_{std::make_unique<next_layer_type>(ex, ctx)}
{ }
/// Constructor
explicit
basic_connection(asio::io_context& ioc, asio::ssl::context& ctx)
: basic_connection(ioc.get_executor(), ctx)
{ }
/// Returns the associated executor.
auto get_executor() {return stream_->get_executor();}
/// Reset the underlying stream.
void reset_stream()
{
if (stream_->next_layer().is_open()) {
stream_->next_layer().close();
stream_ = std::make_unique<next_layer_type>(stream_->get_executor(), *ctx_);
}
}
/// Returns a reference to the next layer.
auto& next_layer() noexcept { return *stream_; }
/// Returns a const reference to the next layer.
auto const& next_layer() const noexcept { return *stream_; }
/** @brief Establishes a connection with the Redis server asynchronously.
*
* See redis::connection::async_run for more information.
*/
template <
class Logger = logger,
class CompletionToken = asio::default_completion_token_t<executor_type>>
auto
async_run(
config const& cfg = {},
Logger l = Logger{},
CompletionToken token = CompletionToken{})
{
reconn_.set_wait_interval(cfg.reconnect_wait_interval);
runner_.set_config(cfg);
l.set_prefix(runner_.get_config().log_prefix);
return reconn_.async_run(*this, l, std::move(token));
}
/** @brief Executes a command on the Redis server asynchronously.
*
* See redis::connection::async_exec for more information.
*/
template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_exec(
request const& req,
Response& response = ignore,
CompletionToken token = CompletionToken{})
{
return base_type::async_exec(req, response, std::move(token));
}
/** @brief Receives server side pushes asynchronously.
*
* See redis::connection::async_receive for detailed information.
*/
template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>>
auto async_receive(
Response& response = ignore,
CompletionToken token = CompletionToken{})
{
return base_type::async_receive(response, std::move(token));
}
/** @brief Cancel operations.
*
* See redis::connection::cancel for more information.
*/
auto cancel(operation op = operation::all) -> std::size_t
{
reconn_.cancel(op);
runner_.cancel(op);
return base_type::cancel(op);
}
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
/// Sets the maximum size of the read buffer.
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
{ base_type::set_max_buffer_read_size(max_read_size); }
/** @brief Reserve memory on the read and write internal buffers.
*
* This function will call `std::string::reserve` on the
* underlying buffers.
*
* @param read The new capacity of the read buffer.
* @param write The new capacity of the write buffer.
*/
void reserve(std::size_t read, std::size_t write)
{ base_type::reserve(read, write); }
/// Returns true if the connection was canceled.
bool is_cancelled() const noexcept
{ return reconn_.is_cancelled();}
private:
using runner_type = redis::detail::runner<executor_type, detail::handshaker>;
using reconnection_type = redis::detail::basic_reconnection<executor_type>;
using this_type = basic_connection<next_layer_type>;
template <class Logger, class CompletionToken>
auto async_run_one(Logger l, CompletionToken token)
{ return runner_.async_run(*this, l, std::move(token)); }
template <class Logger, class CompletionToken>
auto async_run_impl(Logger l, CompletionToken token)
{ return base_type::async_run_impl(l, std::move(token)); }
template <class, class> friend class redis::detail::connection_base;
template <class, class> friend class redis::detail::read_next_op;
template <class, class> friend struct redis::detail::exec_op;
template <class, class> friend struct redis::detail::receive_op;
template <class, class> friend struct redis::detail::run_op;
template <class, class> friend struct redis::detail::writer_op;
template <class> friend struct redis::detail::reader_op;
template <class> friend struct redis::detail::wait_receive_op;
template <class, class, class> friend struct redis::detail::run_all_op;
template <class, class, class> friend struct redis::detail::reconnection_op;
auto is_open() const noexcept { return stream_->next_layer().is_open(); }
void close()
{ reset_stream(); }
asio::ssl::context* ctx_;
reconnection_type reconn_;
runner_type runner_;
std::unique_ptr<next_layer_type> stream_;
};
/** \brief A connection that uses a boost::asio::ssl::stream<boost::asio::ip::tcp::socket>.
* \ingroup high-level-api
*/
using connection = basic_connection<asio::ssl::stream<asio::ip::tcp::socket>>;
} // boost::redis::ssl
#endif // BOOST_REDIS_SSL_CONNECTION_HPP

View File

@@ -75,14 +75,15 @@ BOOST_AUTO_TEST_CASE(check_health)
net::io_context ioc;
connection conn1{ioc};
conn1.cancel(operation::reconnection);
net::ssl::context ctx{net::ssl::context::tls_client};
connection conn1{ioc, ctx};
request req1;
req1.push("CLIENT", "PAUSE", "10000", "ALL");
config cfg1;
cfg1.health_check_id = "conn1";
cfg1.reconnect_wait_interval = std::chrono::seconds::zero();
error_code res1;
conn1.async_run(cfg1, {}, [&](auto ec) {
std::cout << "async_run 1 completed: " << ec.message() << std::endl;
@@ -93,7 +94,7 @@ BOOST_AUTO_TEST_CASE(check_health)
// It looks like client pause does not work for clients that are
// sending MONITOR. I will therefore open a second connection.
connection conn2{ioc};
connection conn2{ioc, ctx};
config cfg2;
cfg2.health_check_id = "conn2";

View File

@@ -65,7 +65,8 @@ auto echo_session(std::shared_ptr<connection> conn, std::string id, int n) -> ne
auto async_echo_stress() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ex, ctx);
int const sessions = 500;
int const msgs = 1000;

View File

@@ -43,7 +43,8 @@ BOOST_AUTO_TEST_CASE(hello_priority)
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
bool seen1 = false;
bool seen2 = false;
@@ -92,7 +93,8 @@ BOOST_AUTO_TEST_CASE(wrong_response_data_type)
response<int> resp;
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
conn->async_exec(req, resp, [conn](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::redis::error::not_a_number);
@@ -110,7 +112,8 @@ BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected)
req.push("PING");
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
conn->async_exec(req, ignore, [conn](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::redis::error::not_connected);
conn->cancel();

View File

@@ -38,7 +38,8 @@ using namespace std::chrono_literals;
auto implicit_cancel_of_req_written() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ex, ctx);
config cfg;
cfg.health_check_interval = std::chrono::seconds{0};
@@ -80,7 +81,8 @@ BOOST_AUTO_TEST_CASE(test_ignore_implicit_cancel_of_req_written)
BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
request req0;
req0.push("PING");

View File

@@ -40,7 +40,8 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
auto ex = co_await net::this_coro::executor;
generic_response gresp;
auto conn = std::make_shared<connection>(ex);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ex, ctx);
run(conn);

View File

@@ -38,7 +38,8 @@ BOOST_AUTO_TEST_CASE(no_ignore_error)
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
conn->async_exec(req, ignore, [&](auto ec, auto){
BOOST_CHECK_EQUAL(ec, error::resp3_simple_error);
@@ -64,7 +65,8 @@ BOOST_AUTO_TEST_CASE(has_diagnostic)
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
response<std::string, std::string> resp;
conn->async_exec(req, resp, [&](auto ec, auto){
@@ -106,7 +108,8 @@ BOOST_AUTO_TEST_CASE(resp3_error_in_cmd_pipeline)
response<std::string> resp2;
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
auto c2 = [&](auto ec, auto)
{
@@ -162,7 +165,8 @@ BOOST_AUTO_TEST_CASE(error_in_transaction)
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
conn->async_exec(req, resp, [&](auto ec, auto){
BOOST_TEST(!ec);
@@ -214,7 +218,8 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
req2.push("SUBSCRIBE"); // Wrong command synthax.
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
auto c2 = [&](auto ec, auto)
{

View File

@@ -42,7 +42,8 @@ BOOST_AUTO_TEST_CASE(request_retry_false)
req2.push("PING");
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});
@@ -104,7 +105,8 @@ BOOST_AUTO_TEST_CASE(request_retry_true)
req3.push("QUIT");
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});

View File

@@ -46,7 +46,8 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
auto c3 =[](auto ec, auto...)
{
@@ -86,7 +87,8 @@ BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
BOOST_AUTO_TEST_CASE(push_received1)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
request req;
//req.push("HELLO", 3);
@@ -116,7 +118,8 @@ BOOST_AUTO_TEST_CASE(push_received1)
BOOST_AUTO_TEST_CASE(push_filtered_out)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
request req;
req.push("HELLO", 3);
@@ -183,7 +186,8 @@ auto boost_redis_adapt(response_error_tag&)
BOOST_AUTO_TEST_CASE(test_push_adapter)
{
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
request req;
req.push("HELLO", 3);
@@ -234,7 +238,8 @@ BOOST_AUTO_TEST_CASE(many_subscribers)
req3.push("QUIT");
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
auto c11 =[&](auto ec, auto...)
{

View File

@@ -9,6 +9,7 @@
#define BOOST_TEST_MODULE conn-quit
#include <boost/test/included/unit_test.hpp>
#include <iostream>
#include "common.hpp"
// TODO: Move this to a lib.
#include <boost/redis/src.hpp>
@@ -30,17 +31,15 @@ BOOST_AUTO_TEST_CASE(test_eof_no_error)
req.push("QUIT");
net::io_context ioc;
connection conn{ioc};
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
conn.async_exec(req, ignore, [&](auto ec, auto) {
conn->async_exec(req, ignore, [&](auto ec, auto) {
BOOST_TEST(!ec);
conn.cancel(operation::reconnection);
});
conn.async_run({}, {}, [](auto ec){
BOOST_TEST(!!ec);
conn->cancel(operation::reconnection);
});
run(conn);
ioc.run();
}
@@ -49,8 +48,8 @@ BOOST_AUTO_TEST_CASE(test_async_run_exits)
{
net::io_context ioc;
connection conn{ioc};
conn.cancel(operation::reconnection);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
request req1;
req1.get_config().cancel_on_connection_lost = false;
@@ -75,25 +74,24 @@ BOOST_AUTO_TEST_CASE(test_async_run_exits)
{
std::clog << "c2: " << ec.message() << std::endl;
BOOST_TEST(!ec);
conn.async_exec(req3, ignore, c3);
conn->async_exec(req3, ignore, c3);
};
auto c1 = [&](auto ec, auto)
{
std::cout << "c3: " << ec.message() << std::endl;
std::cout << "c1: " << ec.message() << std::endl;
BOOST_TEST(!ec);
conn.async_exec(req2, ignore, c2);
conn->async_exec(req2, ignore, c2);
};
conn.async_exec(req1, ignore, c1);
conn->async_exec(req1, ignore, c1);
// The healthy checker should not be the cause of async_run
// completing, so we set a long timeout.
// completing, so we disable.
config cfg;
cfg.health_check_interval = 10000s;
conn.async_run({}, {}, [&](auto ec){
BOOST_TEST(!!ec);
});
cfg.health_check_interval = 0s;
cfg.reconnect_wait_interval = 0s;
run(conn, cfg);
ioc.run();
}

View File

@@ -36,7 +36,8 @@ net::awaitable<void> test_reconnect_impl()
request req;
req.push("QUIT");
auto conn = std::make_shared<connection>(ex);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ex, ctx);
run(conn);
int i = 0;
@@ -68,7 +69,8 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
net::steady_timer st{ex};
auto conn = std::make_shared<connection>(ex);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ex, ctx);
error_code ec1, ec3;
request req1;

View File

@@ -36,7 +36,8 @@ using namespace net::experimental::awaitable_operators;
auto async_cancel_run_with_timer() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
connection conn{ex};
net::ssl::context ctx{net::ssl::context::tls_client};
connection conn{ex, ctx};
net::steady_timer st{ex};
st.expires_after(1s);
@@ -61,7 +62,8 @@ auto
async_check_cancellation_not_missed(int n, std::chrono::milliseconds ms) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
connection conn{ex};
net::ssl::context ctx{net::ssl::context::tls_client};
connection conn{ex, ctx};
net::steady_timer timer{ex};

View File

@@ -4,7 +4,7 @@
* accompanying file LICENSE.txt)
*/
#include <boost/redis/ssl/connection.hpp>
#include <boost/redis/connection.hpp>
#define BOOST_TEST_MODULE conn-tls
#include <boost/test/included/unit_test.hpp>
#include <iostream>
@@ -14,7 +14,7 @@
namespace net = boost::asio;
using connection = boost::redis::ssl::connection;
using connection = boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
@@ -29,6 +29,7 @@ bool verify_certificate(bool, net::ssl::verify_context&)
BOOST_AUTO_TEST_CASE(ping)
{
config cfg;
cfg.use_ssl = true;
cfg.username = "aedis";
cfg.password = "aedis";
cfg.addr.host = "db.occase.de";
@@ -42,7 +43,7 @@ BOOST_AUTO_TEST_CASE(ping)
response<std::string> resp;
net::io_context ioc;
net::ssl::context ctx{net::ssl::context::sslv23};
net::ssl::context ctx{net::ssl::context::tls_client};
connection conn{ioc, ctx};
conn.next_layer().set_verify_mode(net::ssl::verify_peer);
conn.next_layer().set_verify_callback(verify_certificate);

View File

@@ -40,7 +40,7 @@ auto
receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
std::cout << "uuu" << std::endl;
while (!conn->is_cancelled()) {
while (conn->will_reconnect()) {
std::cout << "dddd" << std::endl;
// Loop reading Redis pushs messages.
for (;;) {
@@ -84,11 +84,12 @@ periodic_task(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto co_main(config cfg) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto ctx = std::make_shared<net::ssl::context>(net::ssl::context::tls_client);
auto conn = std::make_shared<connection>(ex, *ctx);
net::co_spawn(ex, receiver(conn), net::detached);
net::co_spawn(ex, periodic_task(conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, net::consign(net::detached, conn, ctx));
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -38,9 +38,10 @@ BOOST_AUTO_TEST_CASE(resolve_bad_host)
cfg.resolve_timeout = 10h;
cfg.connect_timeout = 10h;
cfg.health_check_interval = 10h;
cfg.reconnect_wait_interval = 0s;
connection conn{ioc};
conn.cancel(operation::reconnection);
net::ssl::context ctx{net::ssl::context::tls_client};
connection conn{ioc, ctx};
conn.async_run(cfg, {}, [](auto ec){
BOOST_TEST(is_host_not_found(ec));
});
@@ -58,9 +59,10 @@ BOOST_AUTO_TEST_CASE(resolve_with_timeout)
cfg.resolve_timeout = 1ms;
cfg.connect_timeout = 1ms;
cfg.health_check_interval = 10h;
cfg.reconnect_wait_interval = 0s;
auto conn = std::make_shared<connection>(ioc);
conn->cancel(operation::reconnection);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
run(conn, cfg);
ioc.run();
}
@@ -75,9 +77,10 @@ BOOST_AUTO_TEST_CASE(connect_bad_port)
cfg.resolve_timeout = 10h;
cfg.connect_timeout = 10s;
cfg.health_check_interval = 10h;
cfg.reconnect_wait_interval = 0s;
auto conn = std::make_shared<connection>(ioc);
conn->cancel(operation::reconnection);
net::ssl::context ctx{net::ssl::context::tls_client};
auto conn = std::make_shared<connection>(ioc, ctx);
run(conn, cfg, net::error::connection_refused);
ioc.run();
}