2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Implements non-member async_run for plain connections.

This function will resolve and connect before calling member async_run.
This commit is contained in:
Marcelo Zimbres
2023-03-12 21:54:32 +01:00
parent cd00047a49
commit fd967204df
43 changed files with 719 additions and 439 deletions

View File

@@ -59,9 +59,9 @@ include_directories(include)
#=======================================================================
add_library(common STATIC
examples/common/common.cpp
examples/common/main.cpp
examples/common/boost_redis.cpp
examples/start.cpp
examples/main.cpp
examples/boost_redis.cpp
)
target_compile_features(common PUBLIC cxx_std_20)
if (MSVC)
@@ -98,12 +98,10 @@ if (MSVC)
target_compile_definitions(cpp17_intro PRIVATE _WIN32_WINNT=0x0601)
endif()
if (NOT MSVC)
add_executable(cpp17_intro_sync examples/cpp17_intro_sync.cpp)
target_compile_features(cpp17_intro_sync PUBLIC cxx_std_17)
add_test(cpp17_intro_sync cpp17_intro_sync)
if (MSVC)
target_compile_options(cpp17_intro_sync PRIVATE /bigobj)
target_compile_definitions(cpp17_intro_sync PRIVATE _WIN32_WINNT=0x0601)
endif()
if (NOT MSVC)
@@ -320,6 +318,14 @@ target_link_libraries(test_conn_check_health common)
add_test(test_conn_check_health test_conn_check_health)
endif()
add_executable(test_run tests/run.cpp)
target_compile_features(test_run PUBLIC cxx_std_17)
add_test(test_run test_run)
if (MSVC)
target_compile_options(test_run PRIVATE /bigobj)
target_compile_definitions(test_run PRIVATE _WIN32_WINNT=0x0601)
endif()
# Install
#=======================================================================

View File

@@ -831,6 +831,9 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.
* Adds a function that performs health checks, see
`boost::redis::experimental::async_check_health`.
* Adds non-member `async_run` function that resolves, connects and
calls member `async_run` on a connection object.
### v1.4.0-1
* Renames `retry_on_connection_lost` to `cancel_if_unresponded`. (v1.4.1)

View File

@@ -1,68 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include "common.hpp"
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
using resolver = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::resolver>;
using timer_type = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
using boost::redis::request;
using boost::redis::operation;
namespace
{
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }
}
auto
connect(
std::shared_ptr<connection> conn,
std::string const& host,
std::string const& port) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
resolver resv{ex};
timer_type timer{ex};
boost::system::error_code ec;
timer.expires_after(std::chrono::seconds{5});
auto const addrs = co_await (resv.async_resolve(host, port) || timer.async_wait(redir(ec)));
if (!ec)
throw std::runtime_error("Resolve timeout");
timer.expires_after(std::chrono::seconds{5});
co_await (net::async_connect(conn->next_layer(), std::get<0>(addrs)) || timer.async_wait(redir(ec)));
if (!ec)
throw std::runtime_error("Connect timeout");
}
auto run(net::awaitable<void> op) -> int
{
try {
net::io_context ioc;
net::co_spawn(ioc, std::move(op), [](std::exception_ptr p) {
if (p)
std::rethrow_exception(p);
});
ioc.run();
return 0;
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
return 1;
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -1,33 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_EXAMPLES_COMMON_HPP
#define BOOST_REDIS_EXAMPLES_COMMON_HPP
#include <iostream>
#include <boost/asio.hpp>
#include <boost/redis.hpp>
#include <memory>
#include <iostream>
#include <vector>
#include <map>
#include <set>
#include <string>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
auto
connect(
std::shared_ptr<connection> conn,
std::string const& host,
std::string const& port) -> boost::asio::awaitable<void>;
auto run(boost::asio::awaitable<void> op) -> int;
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
#endif // BOOST_REDIS_EXAMPLES_COMMON_HPP

View File

@@ -10,16 +10,12 @@
#include <boost/redis/src.hpp>
namespace net = boost::asio;
namespace redis = boost::redis;
using redis::operation;
using redis::request;
using redis::response;
using redis::ignore_t;
void log(boost::system::error_code const& ec, char const* prefix)
{
std::clog << prefix << ec.message() << std::endl;
}
using boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::async_run;
using namespace std::chrono_literals;
auto main(int argc, char * argv[]) -> int
{
@@ -36,59 +32,22 @@ auto main(int argc, char * argv[]) -> int
request req;
req.push("HELLO", 3);
req.push("PING", "Hello world");
req.push("QUIT");
// The response.
response<ignore_t, std::string, ignore_t> resp;
response<ignore_t, std::string> resp;
net::io_context ioc;
connection conn{ioc};
// IO objects.
net::ip::tcp::resolver resv{ioc};
redis::connection conn{ioc};
async_run(conn, host, port, 10s, 10s, [&](auto){
conn.cancel();
});
// Resolve endpoints.
net::ip::tcp::resolver::results_type endpoints;
// async_run callback.
auto on_run = [](auto ec)
{
if (ec)
return log(ec, "on_run: ");
};
// async_exec callback.
auto on_exec = [&](auto ec, auto)
{
if (ec) {
conn.cancel(operation::run);
return log(ec, "on_exec: ");
}
std::cout << "PING: " << std::get<1>(resp).value() << std::endl;
};
// Connect callback.
auto on_connect = [&](auto ec, auto)
{
if (ec)
return log(ec, "on_connect: ");
conn.async_run(on_run);
conn.async_exec(req, resp, on_exec);
};
// Resolve callback.
auto on_resolve = [&](auto ec, auto const& addrs)
{
if (ec)
return log(ec, "on_resolve: ");
endpoints = addrs;
net::async_connect(conn.next_layer(), endpoints, on_connect);
};
resv.async_resolve(host, port, on_resolve);
conn.async_exec(req, resp, [&](auto ec, auto){
if (!ec)
std::cout << "PING: " << std::get<1>(resp).value() << std::endl;
conn.cancel();
});
ioc.run();
return 0;
@@ -96,7 +55,6 @@ auto main(int argc, char * argv[]) -> int
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
return 1;
}

View File

@@ -6,19 +6,25 @@
#include <tuple>
#include <string>
#include <chrono>
#include <thread>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/redis.hpp>
#include <boost/redis/check_health.hpp>
// Include this in no more than one .cpp file.
#include <boost/redis/src.hpp>
namespace net = boost::asio;
using connection = boost::redis::connection;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::async_run;
using boost::redis::async_check_health;
using namespace std::chrono_literals;
template <class Response>
auto exec(std::shared_ptr<connection> conn, request const& req, Response& resp)
@@ -29,9 +35,6 @@ auto exec(std::shared_ptr<connection> conn, request const& req, Response& resp)
(net::use_future).get();
}
auto logger = [](auto const& ec)
{ std::clog << "Run: " << ec.message() << std::endl; };
auto main(int argc, char * argv[]) -> int
{
try {
@@ -47,17 +50,17 @@ auto main(int argc, char * argv[]) -> int
auto conn = std::make_shared<connection>(ioc);
// Resolves the address
net::ip::tcp::resolver resv{ioc};
auto const res = resv.resolve(host, port);
// Starts a thread that will can io_context::run on which the
// connection will run.
std::thread t{[&ioc, conn, host, port]() {
async_run(*conn, host, port, 10s, 10s, [conn](auto){
conn->cancel();
});
// Connect to Redis
net::connect(conn->next_layer(), res);
async_check_health(*conn, "Boost.Redis", 2s, [conn](auto) {
conn->cancel();
});
// Starts a thread that will can io_context::run on which
// the connection will run.
std::thread t{[conn, &ioc]() {
conn->async_run(logger);
ioc.run();
}};

View File

@@ -11,17 +11,17 @@ namespace net = boost::asio;
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/redis.hpp>
#include <boost/redis/experimental/run.hpp>
#include <boost/redis/check_health.hpp>
#include <unistd.h>
#include "common/common.hpp"
using namespace net::experimental::awaitable_operators;
using stream_descriptor = net::use_awaitable_t<>::as_default_on_t<net::posix::stream_descriptor>;
using signal_set = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::experimental::async_check_health;
using boost::redis::async_check_health;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
// Chat over Redis pubsub. To test, run this program from multiple
// terminals and type messages to stdin.
@@ -60,8 +60,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable<void>
req.push("HELLO", 3);
req.push("SUBSCRIBE", "chat-channel");
co_await connect(conn, host, port);
co_await ((conn->async_run() || publisher(stream, conn) || receiver(conn) ||
co_await ((async_run(*conn, host, port) || publisher(stream, conn) || receiver(conn) ||
async_check_health(*conn) || sig.async_wait()) &&
conn->async_exec(req));
}

View File

@@ -6,19 +6,18 @@
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/redis.hpp>
#include <map>
#include <vector>
#include "common/common.hpp"
#include <iostream>
namespace net = boost::asio;
namespace redis = boost::redis;
using namespace net::experimental::awaitable_operators;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
void print(std::map<std::string, std::string> const& cont)
{
@@ -34,8 +33,7 @@ void print(std::vector<int> const& cont)
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
{
co_await connect(conn, host, port);
co_await conn->async_run();
co_await async_run(*conn, host, port);
}
// Stores the content of some STL containers in Redis.

View File

@@ -8,8 +8,7 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/redis.hpp>
#include <boost/redis/experimental/run.hpp>
#include "common/common.hpp"
#include <boost/redis/check_health.hpp>
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
@@ -18,7 +17,9 @@ using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::accep
using signal_set = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using boost::redis::request;
using boost::redis::response;
using boost::redis::experimental::async_check_health;
using boost::redis::async_check_health;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> net::awaitable<void>
{
@@ -55,8 +56,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable<void>
request req;
req.push("HELLO", 3);
co_await connect(conn, host, port);
co_await ((conn->async_run() || listener(conn) || async_check_health(*conn) ||
co_await ((async_run(*conn, host, port) || listener(conn) || async_check_health(*conn) ||
sig.async_wait()) && conn->async_exec(req));
}

View File

@@ -7,25 +7,22 @@
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/redis.hpp>
#include "common/common.hpp"
#include <iostream>
namespace net = boost::asio;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
{
// From examples/common.hpp to avoid vebosity
co_await connect(conn, host, port);
// async_run coordinate read and write operations.
co_await conn->async_run();
co_await async_run(*conn, host, port);
// Cancel pending operations, if any.
conn->cancel(operation::exec);
conn->cancel(operation::receive);
conn->cancel();
}
// Called from the main function (see main.cpp)

View File

@@ -4,17 +4,19 @@
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/redis.hpp>
#include "common/common.hpp"
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
// Called from the main function (see main.cpp)
auto co_main(std::string host, std::string port) -> net::awaitable<void>
@@ -27,9 +29,8 @@ auto co_main(std::string host, std::string port) -> net::awaitable<void>
response<ignore_t, std::string, ignore_t> resp;
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
co_await connect(conn, host, port);
co_await (conn->async_run() || conn->async_exec(req, resp));
connection conn{co_await net::this_coro::executor};
co_await (async_run(conn, host, port) || conn.async_exec(req, resp));
std::cout << "PING: " << std::get<1>(resp).value() << std::endl;
} catch (std::exception const& e) {

View File

@@ -12,7 +12,6 @@
#include <boost/describe.hpp>
#include <string>
#include <iostream>
#include "common/common.hpp"
#include "json.hpp"
// Include this in no more than one .cpp file.
@@ -25,6 +24,8 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::operation;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
// Struct that will be stored in Redis using json serialization.
struct user {
@@ -45,8 +46,7 @@ void boost_redis_from_bulk(user& u, std::string_view sv, boost::system::error_co
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
{
co_await connect(conn, host, port);
co_await conn->async_run();
co_await async_run(*conn, host, port);
}
net::awaitable<void> co_main(std::string host, std::string port)

View File

@@ -8,7 +8,6 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/redis.hpp>
#include <iostream>
#include "common/common.hpp"
#include "protobuf.hpp"
// See the definition in person.proto. This header is automatically
@@ -21,6 +20,8 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::operation;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
// The protobuf type described in examples/person.proto
using tutorial::person;
@@ -42,8 +43,7 @@ using tutorial::boost_redis_from_bulk;
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
{
co_await connect(conn, host, port);
co_await conn->async_run();
co_await async_run(*conn, host, port);
}
net::awaitable<void> co_main(std::string host, std::string port)

View File

@@ -4,19 +4,20 @@
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/redis.hpp>
#include "common/common.hpp"
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
using endpoints = net::ip::tcp::resolver::results_type;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }
@@ -40,8 +41,7 @@ auto resolve_master_address(std::vector<address> const& endpoints) -> net::await
response<std::optional<std::array<std::string, 2>>, ignore_t> addr;
for (auto ep : endpoints) {
boost::system::error_code ec;
co_await connect(conn, ep.host, ep.port);
co_await (conn->async_run() && conn->async_exec(req, addr, redir(ec)));
co_await (async_run(*conn, ep.host, ep.port) && conn->async_exec(req, addr, redir(ec)));
conn->reset_stream();
if (std::get<0>(addr))
co_return address{std::get<0>(addr).value().value().at(0), std::get<0>(addr).value().value().at(1)};

View File

@@ -4,20 +4,21 @@
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/redis.hpp>
#include <boost/redis/experimental/run.hpp>
#include "common/common.hpp"
#include <boost/redis/check_health.hpp>
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
using boost::redis::request;
using boost::redis::async_run;
using boost::redis::generic_response;
using boost::redis::experimental::async_check_health;
using boost::redis::async_check_health;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
/* This example will subscribe and read pushes indefinitely.
*
@@ -57,9 +58,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable<void>
// The loop will reconnect on connection lost. To exit type Ctrl-C twice.
for (;;) {
co_await connect(conn, host, port);
co_await ((conn->async_run() || async_check_health(*conn) || receiver(conn)) && conn->async_exec(req));
co_await ((async_run(*conn, host, port) || async_check_health(*conn) || receiver(conn)) && conn->async_exec(req));
conn->reset_stream();
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();

View File

@@ -8,7 +8,7 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include "common.hpp"
#include "start.hpp"
extern boost::asio::awaitable<void> co_main(std::string, std::string);
@@ -22,7 +22,7 @@ auto main(int argc, char * argv[]) -> int
port = argv[2];
}
return run(co_main(host, port));
return start(co_main(host, port));
}
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)

33
examples/start.cpp Normal file
View File

@@ -0,0 +1,33 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include "start.hpp"
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <iostream>
namespace net = boost::asio;
auto start(net::awaitable<void> op) -> int
{
try {
net::io_context ioc;
net::co_spawn(ioc, std::move(op), [](std::exception_ptr p) {
if (p)
std::rethrow_exception(p);
});
ioc.run();
return 0;
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
return 1;
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

17
examples/start.hpp Normal file
View File

@@ -0,0 +1,17 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_EXAMPLES_START_HPP
#define BOOST_REDIS_EXAMPLES_START_HPP
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
auto start(boost::asio::awaitable<void> op) -> int;
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
#endif // BOOST_REDIS_EXAMPLES_START_HPP

View File

@@ -11,6 +11,7 @@
#include <boost/redis/connection.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/run.hpp>
/** @defgroup high-level-api Reference
*

View File

@@ -4,17 +4,22 @@
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_RUN_HPP
#define BOOST_REDIS_RUN_HPP
#ifndef BOOST_REDIS_CHECK_HEALTH_HPP
#define BOOST_REDIS_CHECK_HEALTH_HPP
// Has to included before promise.hpp to build on msvc.
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/detail/read_ops.hpp>
#include <boost/asio/experimental/promise.hpp>
#include <boost/asio/experimental/use_promise.hpp>
#include <boost/asio/steady_timer.hpp>
#include <memory>
#include <chrono>
#include <optional>
namespace boost::redis::experimental {
namespace boost::redis {
namespace detail {
template <class Connection>
@@ -23,14 +28,17 @@ private:
using executor_type = typename Connection::executor_type;
struct state {
using clock_type = std::chrono::steady_clock;
using clock_traits_type = asio::wait_traits<clock_type>;
using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using promise_type = asio::experimental::promise<void(system::error_code, std::size_t), executor_type>;
using timer_type =
asio::basic_waitable_timer<
std::chrono::steady_clock,
asio::wait_traits<std::chrono::steady_clock>,
executor_type>;
timer_type timer_;
request req_;
generic_response resp_;
redis::request req_;
redis::generic_response resp_;
std::optional<promise_type> prom_;
std::chrono::steady_clock::duration interval_;
@@ -124,6 +132,6 @@ async_check_health(
>(detail::check_health_op<Connection>{conn, msg, interval}, token, conn);
}
} // boost::redis::experimental
} // boost::redis
#endif // BOOST_REDIS_RUN_HPP
#endif // BOOST_REDIS_CHECK_HEALTH_HPP

View File

@@ -201,15 +201,15 @@ public:
* @li `operation::exec`: Cancels operations started with
* `async_exec`. Affects only requests that haven't been written
* yet.
* @li operation::run: Cancels the `async_run` operation. Notice
* that the preferred way to close a connection is to send a
* [QUIT](https://redis.io/commands/quit/) command to the server.
* @li operation::run: Cancels the `async_run` operation.
* @li operation::receive: Cancels any ongoing calls to * `async_receive`.
* @li operation::all: Cancels all operations listed above. This
* is the default argument.
*
* @param op: The operation to be cancelled.
* @returns The number of operations that have been canceled.
*/
auto cancel(operation op) -> std::size_t
auto cancel(operation op = operation::all) -> std::size_t
{ return base_type::cancel(op); }
/// Sets the maximum size of the read buffer.

View File

@@ -52,7 +52,7 @@ public:
auto get_executor() {return writer_timer_.get_executor();}
auto cancel(operation op) -> std::size_t
auto cancel_impl(operation op) -> std::size_t
{
switch (op) {
case operation::exec:
@@ -77,6 +77,19 @@ public:
}
}
auto cancel(operation op) -> std::size_t
{
if (op == operation::all) {
std::size_t ret = 0;
ret += cancel_impl(operation::run);
ret += cancel_impl(operation::receive);
ret += cancel_impl(operation::exec);
return ret;
}
return cancel_impl(op);
}
auto cancel_unwritten_requests() -> std::size_t
{
auto f = [](auto const& ptr)

View File

@@ -0,0 +1,225 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_RUNNER_HPP
#define BOOST_REDIS_RUNNER_HPP
// Has to included before promise.hpp to build on msvc.
#include <boost/redis/detail/read_ops.hpp>
#include <boost/redis/error.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/consign.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <memory>
#include <chrono>
namespace boost::redis::detail {
template <class Runner>
struct resolve_op {
Runner* runner = nullptr;
std::string_view host;
std::string_view port;
asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, system::error_code ec1 = {}
, asio::ip::tcp::resolver::results_type res = {}
, system::error_code ec2 = {})
{
BOOST_ASIO_CORO_REENTER (coro)
{
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
[this](auto token) { return runner->resv_.async_resolve(host.data(), port.data(), token);},
[this](auto token) { return runner->timer_.async_wait(token);}
).async_wait(
asio::experimental::wait_for_one(),
std::move(self));
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
switch (order[0]) {
case 0: {
// Resolver completed first.
runner->endpoints_ = res;
self.complete(ec1);
} break;
case 1: {
if (ec2) {
// Timer completed first with error, perhaps a
// cancellation going on.
self.complete(ec2);
} else {
// Timer completed first without an error, this is a
// resolve timeout.
self.complete(error::resolve_timeout);
}
} break;
default: BOOST_ASSERT(false);
}
}
}
};
template <class Runner, class Stream>
struct connect_op {
Runner* runner = nullptr;
Stream* stream = nullptr;
asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, system::error_code ec1 = {}
, asio::ip::tcp::endpoint const& = {}
, system::error_code ec2 = {})
{
BOOST_ASIO_CORO_REENTER (coro)
{
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
[this](auto token)
{
auto f = [](system::error_code const&, auto const&) { return true; };
return asio::async_connect(*stream, runner->endpoints_, f, token);
},
[this](auto token) { return runner->timer_.async_wait(token);}
).async_wait(
asio::experimental::wait_for_one(),
std::move(self));
if (is_cancelled(self)) {
self.complete(asio::error::operation_aborted);
return;
}
switch (order[0]) {
case 0: {
self.complete(ec1);
} break;
case 1:
{
if (ec2) {
self.complete(ec2);
} else {
self.complete(error::connect_timeout);
}
} break;
default: BOOST_ASSERT(false);
}
}
}
};
template <class Runner, class Connection>
struct runner_op {
Runner* runner = nullptr;
Connection* conn = nullptr;
std::string_view host;
std::string_view port;
std::chrono::steady_clock::duration resolve_timeout;
std::chrono::steady_clock::duration connect_timeout;
asio::coroutine coro{};
template <class Self>
void operator()(Self& self, system::error_code ec = {})
{
BOOST_ASIO_CORO_REENTER (coro)
{
runner->timer_.expires_after(resolve_timeout);
BOOST_ASIO_CORO_YIELD
runner->async_resolve(host, port, std::move(self));
AEDIS_CHECK_OP0(;)
runner->timer_.expires_after(connect_timeout);
BOOST_ASIO_CORO_YIELD
runner->async_connect(conn->next_layer(), std::move(self));
AEDIS_CHECK_OP0(;)
BOOST_ASIO_CORO_YIELD
conn->async_run(std::move(self));
AEDIS_CHECK_OP0(;)
self.complete({});
}
}
};
template <class Executor>
class runner {
public:
runner(Executor ex): resv_{ex}, timer_{ex} {}
template <class CompletionToken>
auto
async_resolve(
std::string_view host,
std::string_view port,
CompletionToken&& token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(resolve_op<runner>{this, host, port}, token, resv_);
}
template <class Stream, class CompletionToken>
auto async_connect(Stream& stream, CompletionToken&& token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(connect_op<runner, Stream>{this, &stream}, token, resv_);
}
template <class Connection, class CompletionToken>
auto
async_run(
Connection& conn,
std::string_view host,
std::string_view port,
std::chrono::steady_clock::duration resolve_timeout,
std::chrono::steady_clock::duration connect_timeout,
CompletionToken&& token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code)
>(runner_op<runner, Connection>{this, &conn, host, port, resolve_timeout, connect_timeout}, token, resv_);
}
private:
using resolver_type = asio::ip::basic_resolver<asio::ip::tcp, Executor>;
using timer_type =
asio::basic_waitable_timer<
std::chrono::steady_clock,
asio::wait_traits<std::chrono::steady_clock>,
Executor>;
template <class, class> friend struct runner_op;
template <class, class> friend struct connect_op;
template <class> friend struct resolve_op;
resolver_type resv_;
timer_type timer_;
asio::ip::tcp::resolver::results_type endpoints_;
};
} // boost::redis::detail
#endif // BOOST_REDIS_RUNNER_HPP

View File

@@ -63,6 +63,12 @@ enum class error
/// There is no stablished connection.
not_connected,
/// Resolve timeout
resolve_timeout,
/// Connect timeout
connect_timeout,
};
/** \internal

View File

@@ -38,6 +38,8 @@ struct error_category_impl : system::error_category {
case error::not_a_double: return "Not a double.";
case error::resp3_null: return "Got RESP3 null.";
case error::not_connected: return "Not connected.";
case error::resolve_timeout: return "Resolve timeout.";
case error::connect_timeout: return "Connect timeout.";
default: BOOST_ASSERT(false); return "Boost.Redis error.";
}
}

View File

@@ -22,6 +22,8 @@ enum class operation {
run,
/// Refers to `connection::async_receive` operations.
receive,
/// Refers to all operations.
all,
};
} // boost::redis

View File

@@ -0,0 +1,63 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_RUN_HPP
#define BOOST_REDIS_RUN_HPP
// Has to included before promise.hpp to build on msvc.
#include <boost/redis/detail/runner.hpp>
#include <boost/redis/connection.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/consign.hpp>
#include <memory>
#include <chrono>
namespace boost::redis {
/** @brief Call async_run on the connection.
* @ingroup high-level-api
*
* This is a facility function that
* 1. Resoves the endpoint.
* 2. Connects to one of the endpoints from 1.
* 3. Calls async_run on the underlying connection.
*
* @param host Redis host to connect to.
* @param port Redis port to connect to.
* @param resolve_timeout Time the resolve operation is allowed to take.
* @param connect_timeout Time the connect operation is allowed to take.
* @param token Completion token.
*/
template <
class Socket,
class CompletionToken = asio::default_completion_token_t<typename Socket::executor_type>
>
auto
async_run(
basic_connection<Socket>& conn,
std::string_view host = "127.0.0.1",
std::string_view port = "6379",
std::chrono::steady_clock::duration resolve_timeout = std::chrono::seconds{10},
std::chrono::steady_clock::duration connect_timeout = std::chrono::seconds{10},
CompletionToken token = CompletionToken{})
{
using executor_type = typename Socket::executor_type;
using runner_type = detail::runner<executor_type>;
auto runner = std::make_shared<runner_type>(conn.get_executor());
return
runner->async_run(
conn,
host,
port,
resolve_timeout,
connect_timeout,
asio::consign(std::move(token), runner));
}
} // boost::redis
#endif // BOOST_REDIS_RUN_HPP

View File

@@ -122,7 +122,7 @@ public:
*
* See redis::connection::cancel for more information.
*/
auto cancel(operation op) -> std::size_t
auto cancel(operation op = operation::all) -> std::size_t
{ return base_type::cancel(op); }
auto& lowest_layer() noexcept { return stream_.lowest_layer(); }

View File

@@ -1,22 +1,9 @@
#pragma once
#include <boost/asio.hpp>
#include <chrono>
namespace net = boost::asio;
using endpoints = net::ip::tcp::resolver::results_type;
auto
resolve(
std::string const& host = "127.0.0.1",
std::string const& port = "6379") -> endpoints
{
net::io_context ioc;
net::ip::tcp::resolver resv{ioc};
return resv.resolve(host, port);
}
#ifdef BOOST_ASIO_HAS_CO_AWAIT
namespace net = boost::asio;
inline
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }

View File

@@ -12,7 +12,7 @@
#include <boost/test/included/unit_test.hpp>
#include <boost/redis.hpp>
#include <boost/redis/experimental/run.hpp>
#include <boost/redis/check_health.hpp>
#include <boost/redis/src.hpp>
#include "common.hpp"
@@ -24,7 +24,9 @@ using boost::redis::request;
using boost::redis::ignore;
using boost::redis::operation;
using boost::redis::generic_response;
using boost::redis::experimental::async_check_health;
using boost::redis::async_check_health;
using boost::redis::async_run;
using namespace std::chrono_literals;
std::chrono::seconds const interval{1};
@@ -74,14 +76,11 @@ BOOST_AUTO_TEST_CASE(check_health)
{
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
// It looks like client pause does not work for clients that are
// sending MONITOR. I will therefore open a second connection.
connection conn2{ioc};
net::connect(conn2.next_layer(), endpoints);
std::string const msg = "test-check-health";
@@ -108,12 +107,12 @@ BOOST_AUTO_TEST_CASE(check_health)
generic_response resp;
push_callback{&conn, &conn2, &resp, &req2}(); // Starts reading pushes.
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
std::cout << "B" << std::endl;
BOOST_TEST(!!ec);
});
conn2.async_run([](auto ec){
async_run(conn2, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
std::cout << "C" << std::endl;
BOOST_TEST(!!ec);
});

View File

@@ -13,7 +13,7 @@
#include <boost/redis.hpp>
#include <boost/redis/src.hpp>
#include "common.hpp"
#include "../examples/common/common.hpp"
#include "../examples/start.hpp"
namespace net = boost::asio;
using error_code = boost::system::error_code;
@@ -22,6 +22,9 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using namespace std::chrono_literals;
auto push_consumer(std::shared_ptr<connection> conn, int expected) -> net::awaitable<void>
{
@@ -74,13 +77,12 @@ auto async_echo_stress() -> net::awaitable<void>
for (int i = 0; i < sessions; ++i)
net::co_spawn(ex, echo_session(conn, std::to_string(i), msgs), net::detached);
co_await connect(conn, "127.0.0.1", "6379");
co_await conn->async_run();
co_await async_run(*conn);
}
BOOST_AUTO_TEST_CASE(echo_stress)
{
run(async_echo_stress());
start(async_echo_stress());
}
#else

View File

@@ -28,6 +28,8 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::async_run;
using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(hello_priority)
{
@@ -47,9 +49,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
bool seen1 = false;
bool seen2 = false;
@@ -77,7 +77,7 @@ BOOST_AUTO_TEST_CASE(hello_priority)
seen3 = true;
});
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_TEST(!ec);
});
@@ -94,14 +94,12 @@ BOOST_AUTO_TEST_CASE(wrong_response_data_type)
response<ignore_t, int> resp;
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req, resp, [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::redis::error::not_a_number);
});
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted);
});

View File

@@ -14,7 +14,7 @@
#include <boost/redis.hpp>
#include <boost/redis/src.hpp>
#include "common.hpp"
#include "../examples/common/common.hpp"
#include "../examples/start.hpp"
// NOTE1: Sends hello separately. I have observed that if hello and
// blpop are sent toguether, Redis will send the response of hello
@@ -30,6 +30,9 @@ using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using namespace std::chrono_literals;
auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
{
@@ -37,9 +40,8 @@ auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
generic_response gresp;
auto conn = std::make_shared<connection>(ex);
co_await connect(conn, "127.0.0.1", "6379");
conn->async_run([conn](auto ec) {
async_run(*conn, "127.0.0.1", "6379", 10s, 10s, [conn](auto ec) {
std::cout << "async_run: " << ec.message() << std::endl;
BOOST_TEST(!ec);
});
@@ -85,11 +87,10 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
co_await connect(conn, "127.0.0.1", "6379");
// Calls async_run separately from the group of ops below to avoid
// having it canceled when the timer fires.
conn->async_run([conn](auto ec) {
async_run(*conn, "127.0.0.1", "6379", 10s, 10s, [conn](auto ec) {
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted);
});
@@ -118,20 +119,18 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>
BOOST_AUTO_TEST_CASE(test_ignore_explicit_cancel_of_req_written)
{
run(async_ignore_explicit_cancel_of_req_written());
start(async_ignore_explicit_cancel_of_req_written());
}
BOOST_AUTO_TEST_CASE(test_ignore_implicit_cancel_of_req_written)
{
run(ignore_implicit_cancel_of_req_written());
start(ignore_implicit_cancel_of_req_written());
}
BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled)
{
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
request req0;
req0.push("HELLO", 3);
@@ -156,7 +155,7 @@ BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled)
conn.async_exec(req0, ignore, c0);
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted);
});

View File

@@ -27,6 +27,8 @@ using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::error;
using boost::redis::async_run;
using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(no_ignore_error)
{
@@ -37,16 +39,13 @@ BOOST_AUTO_TEST_CASE(no_ignore_error)
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req, ignore, [&](auto ec, auto){
BOOST_CHECK_EQUAL(ec, error::resp3_simple_error);
conn.cancel(redis::operation::run);
});
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted);
});
@@ -66,9 +65,7 @@ BOOST_AUTO_TEST_CASE(has_diagnostic)
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
response<std::string, std::string> resp;
conn.async_exec(req, resp, [&](auto ec, auto){
@@ -87,8 +84,7 @@ BOOST_AUTO_TEST_CASE(has_diagnostic)
conn.cancel(redis::operation::run);
});
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted);
});
@@ -111,9 +107,7 @@ BOOST_AUTO_TEST_CASE(resp3_error_in_cmd_pipeline)
response<std::string> resp2;
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
auto c2 = [&](auto ec, auto)
{
@@ -139,8 +133,7 @@ BOOST_AUTO_TEST_CASE(resp3_error_in_cmd_pipeline)
};
conn.async_exec(req1, resp1, c1);
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted);
});
@@ -171,9 +164,7 @@ BOOST_AUTO_TEST_CASE(error_in_transaction)
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req, resp, [&](auto ec, auto){
BOOST_TEST(!ec);
@@ -205,8 +196,7 @@ BOOST_AUTO_TEST_CASE(error_in_transaction)
conn.cancel(redis::operation::run);
});
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted);
});
@@ -226,9 +216,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
req2.push("SUBSCRIBE"); // Wrong command synthax.
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
auto c2 = [&](auto ec, auto)
{
@@ -258,8 +246,7 @@ BOOST_AUTO_TEST_CASE(subscriber_wrong_syntax)
};
conn.async_receive(gresp, c3);
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
std::cout << "async_run" << std::endl;
BOOST_CHECK_EQUAL(ec, boost::asio::error::basic_errors::operation_aborted);
});

View File

@@ -23,6 +23,8 @@ using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::async_run;
using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(request_retry_false)
{
@@ -53,9 +55,6 @@ BOOST_AUTO_TEST_CASE(request_retry_false)
conn.cancel(operation::run);
});
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
auto c2 = [&](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
};
@@ -72,7 +71,7 @@ BOOST_AUTO_TEST_CASE(request_retry_false)
conn.async_exec(req0, ignore, c0);
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
@@ -112,9 +111,6 @@ BOOST_AUTO_TEST_CASE(request_retry_true)
conn.cancel(boost::redis::operation::run);
});
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
auto c3 = [&](auto ec, auto){
BOOST_TEST(!ec);
};
@@ -136,14 +132,13 @@ BOOST_AUTO_TEST_CASE(request_retry_true)
conn.async_exec(req0, ignore, c0);
conn.async_run([&](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
// The first cacellation.
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
conn.reset_stream();
// Reconnects and runs again to test req3.
net::connect(conn.next_layer(), endpoints);
conn.async_run([&](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
std::cout << ec.message() << std::endl;
BOOST_TEST(!ec);
});

View File

@@ -28,13 +28,99 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::async_run;
using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
{
request req1;
req1.push("HELLO", 3);
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("PING", "Message2");
req3.push("QUIT");
net::io_context ioc;
connection conn{ioc};
auto c3 =[](auto ec, auto...)
{
BOOST_TEST(!!ec);
};
auto c2 =[&](auto ec, auto...)
{
BOOST_TEST(!ec);
conn.async_exec(req3, ignore, c3);
};
auto c1 =[&](auto ec, auto...)
{
BOOST_TEST(!ec);
conn.async_exec(req2, ignore, c2);
};
conn.async_exec(req1, ignore, c1);
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
BOOST_TEST(!ec);
conn.cancel(operation::receive);
});
bool push_received = false;
conn.async_receive(ignore, [&](auto ec, auto){
std::cout << "async_receive" << std::endl;
BOOST_TEST(!ec);
conn.cancel(operation::run);
push_received = true;
});
ioc.run();
BOOST_TEST(push_received);
}
BOOST_AUTO_TEST_CASE(push_received1)
{
net::io_context ioc;
connection conn{ioc};
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
conn.async_exec(req, ignore, [](auto ec, auto){
std::cout << "async_exec" << std::endl;
BOOST_TEST(!ec);
});
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
std::cout << "async_run: " << ec.message() << std::endl;
conn.cancel(operation::receive);
});
bool push_received = false;
conn.async_receive(ignore, [&](auto ec, auto){
std::cout << "async_receive" << std::endl;
BOOST_TEST(!ec);
conn.cancel(operation::run);
push_received = true;
});
ioc.run();
BOOST_TEST(push_received);
}
BOOST_AUTO_TEST_CASE(push_filtered_out)
{
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
request req;
req.push("HELLO", 3);
@@ -51,7 +137,7 @@ BOOST_AUTO_TEST_CASE(push_filtered_out)
BOOST_TEST(!ec);
});
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
BOOST_TEST(!ec);
});
@@ -101,9 +187,7 @@ auto boost_redis_adapt(response_error_tag&)
BOOST_AUTO_TEST_CASE(test_push_adapter)
{
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
request req;
req.push("HELLO", 3);
@@ -119,7 +203,7 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
BOOST_CHECK_EQUAL(ec, net::experimental::error::channel_errors::channel_cancelled);
});
conn.async_run([](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
@@ -131,94 +215,9 @@ BOOST_AUTO_TEST_CASE(test_push_adapter)
net::awaitable<void> push_consumer3(connection& conn)
{
for (;;)
for (;;) {
co_await conn.async_receive(ignore, net::use_awaitable);
}
BOOST_AUTO_TEST_CASE(push_received1)
{
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
req.push("QUIT");
conn.async_exec(req, ignore, [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_run([&](auto ec){
BOOST_TEST(!ec);
conn.cancel(operation::receive);
});
bool push_received = false;
net::co_spawn(
ioc.get_executor(),
push_consumer1(conn, push_received),
net::detached);
ioc.run();
BOOST_TEST(push_received);
}
BOOST_AUTO_TEST_CASE(receives_push_waiting_resps)
{
request req1;
req1.push("HELLO", 3);
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("PING", "Message2");
req3.push("QUIT");
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
auto c3 =[](auto ec, auto...)
{
BOOST_TEST(!ec);
};
auto c2 =[&](auto ec, auto...)
{
BOOST_TEST(!ec);
conn.async_exec(req3, ignore, c3);
};
auto c1 =[&](auto ec, auto...)
{
BOOST_TEST(!ec);
conn.async_exec(req2, ignore, c2);
};
conn.async_exec(req1, ignore, c1);
conn.async_run([&](auto ec) {
BOOST_TEST(!ec);
conn.cancel(operation::receive);
});
bool push_received = false;
net::co_spawn(
ioc.get_executor(),
push_consumer1(conn, push_received),
net::detached);
ioc.run();
BOOST_TEST(push_received);
}
}
BOOST_AUTO_TEST_CASE(many_subscribers)
@@ -240,12 +239,11 @@ BOOST_AUTO_TEST_CASE(many_subscribers)
req3.push("QUIT");
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
auto c11 =[&](auto ec, auto...)
{
std::cout << "quit sent" << std::endl;
BOOST_TEST(!ec);
};
auto c10 =[&](auto ec, auto...)
@@ -306,7 +304,7 @@ BOOST_AUTO_TEST_CASE(many_subscribers)
conn.async_exec(req0, ignore, c0);
conn.async_run([&](auto ec) {
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
BOOST_TEST(!ec);
conn.cancel(operation::receive);
});

View File

@@ -23,6 +23,8 @@ using operation = boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::async_run;
using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(test_quit1)
{
@@ -32,15 +34,13 @@ BOOST_AUTO_TEST_CASE(test_quit1)
req.push("QUIT");
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req, ignore, [](auto ec, auto) {
BOOST_TEST(!ec);
});
conn.async_run([](auto ec) {
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
BOOST_TEST(!ec);
});
@@ -52,9 +52,7 @@ BOOST_AUTO_TEST_CASE(test_quit2)
{
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
request req1;
req1.get_config().cancel_on_connection_lost = false;
@@ -93,7 +91,7 @@ BOOST_AUTO_TEST_CASE(test_quit2)
conn.async_exec(req1, ignore, c1);
conn.async_run([&](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
BOOST_TEST(!ec);
});

View File

@@ -14,13 +14,16 @@
#include <boost/redis.hpp>
#include <boost/redis/src.hpp>
#include "common.hpp"
#include "../examples/common/common.hpp"
#include "../examples/start.hpp"
namespace net = boost::asio;
using error_code = boost::system::error_code;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using namespace std::chrono_literals;
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace boost::asio::experimental::awaitable_operators;
@@ -32,16 +35,14 @@ net::awaitable<void> test_reconnect_impl()
request req;
req.push("QUIT");
auto const endpoints = resolve();
connection conn{ex};
int i = 0;
for (; i < 5; ++i) {
boost::system::error_code ec1, ec2;
net::connect(conn.next_layer(), endpoints);
co_await (
conn.async_exec(req, ignore, net::redirect_error(net::use_awaitable, ec1)) &&
conn.async_run(net::redirect_error(net::use_awaitable, ec2))
async_run(conn, "127.0.0.1", "6379", 10s, 10s, net::redirect_error(net::use_awaitable, ec2))
);
BOOST_TEST(!ec1);
@@ -77,11 +78,10 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
req1.push("HELLO", 3);
req1.push("BLPOP", "any", 0);
co_await connect(conn, "127.0.0.1", "6379");
st.expires_after(std::chrono::seconds{1});
co_await (
conn->async_exec(req1, ignore, redir(ec1)) ||
conn->async_run(redir(ec2)) ||
async_run(*conn, "127.0.0.1", "6379", 10s, 10s, redir(ec2)) ||
st.async_wait(redir(ec3))
);
@@ -96,11 +96,10 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
req2.push("HELLO", 3);
req2.push("QUIT");
co_await connect(conn, "127.0.0.1", "6379");
st.expires_after(std::chrono::seconds{1});
co_await (
conn->async_exec(req1, ignore, net::redirect_error(net::use_awaitable, ec1)) ||
conn->async_run(net::redirect_error(net::use_awaitable, ec2)) ||
async_run(*conn, "127.0.0.1", "6379", 10s, 10s, net::redirect_error(net::use_awaitable, ec2)) ||
st.async_wait(net::redirect_error(net::use_awaitable, ec3))
);
std::cout << "ccc" << std::endl;
@@ -111,7 +110,7 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
BOOST_AUTO_TEST_CASE(test_reconnect_and_idle)
{
run(async_test_reconnect_timeout());
start(async_test_reconnect_timeout());
}
#else
int main(){}

View File

@@ -26,6 +26,8 @@ using net::experimental::as_tuple;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::async_run;
using namespace std::chrono_literals;
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace net::experimental::awaitable_operators;
@@ -33,15 +35,13 @@ using namespace net::experimental::awaitable_operators;
auto async_cancel_run_with_timer() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto const endpoints = resolve();
connection conn{ex};
net::connect(conn.next_layer(), endpoints);
net::steady_timer st{ex};
st.expires_after(std::chrono::seconds{1});
boost::system::error_code ec1, ec2;
co_await (conn.async_run(redir(ec1)) || st.async_wait(redir(ec2)));
co_await (async_run(conn, "127.0.0.1", "6379", 10s, 10s, redir(ec1)) || st.async_wait(redir(ec2)));
BOOST_CHECK_EQUAL(ec1, boost::asio::error::basic_errors::operation_aborted);
BOOST_TEST(!ec2);
@@ -58,16 +58,14 @@ auto
async_check_cancellation_not_missed(int n, std::chrono::milliseconds ms) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto const endpoints = resolve();
connection conn{ex};
net::steady_timer timer{ex};
for (auto i = 0; i < n; ++i) {
timer.expires_after(ms);
net::connect(conn.next_layer(), endpoints);
boost::system::error_code ec1, ec2;
co_await (conn.async_run(redir(ec1)) || timer.async_wait(redir(ec2)));
co_await (async_run(conn, "127.0.0.1", "6379", 10s, 10s, redir(ec1)) || timer.async_wait(redir(ec2)));
BOOST_CHECK_EQUAL(ec1, boost::asio::error::basic_errors::operation_aborted);
std::cout << "Counter: " << i << std::endl;
}
@@ -147,10 +145,7 @@ BOOST_AUTO_TEST_CASE(check_implicit_cancel_not_missed_1024)
BOOST_AUTO_TEST_CASE(reset_before_run_completes)
{
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
// Sends a ping just as a means of waiting until we are connected.
request req;
@@ -161,8 +156,7 @@ BOOST_AUTO_TEST_CASE(reset_before_run_completes)
BOOST_TEST(!ec);
conn.reset_stream();
});
conn.async_run([&](auto ec){
async_run(conn, "127.0.0.1", "6379", 10s, 10s, [&](auto ec){
BOOST_CHECK_EQUAL(ec, net::error::operation_aborted);
});

View File

@@ -23,6 +23,18 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using endpoints = net::ip::tcp::resolver::results_type;
auto
resolve(
std::string const& host = "127.0.0.1",
std::string const& port = "6379") -> endpoints
{
net::io_context ioc;
net::ip::tcp::resolver resv{ioc};
return resv.resolve(host, port);
}
struct endpoint {
std::string host;
std::string port;

View File

@@ -6,14 +6,15 @@
// Must come before any asio header, otherwise build fails on msvc.
#include <tuple>
#include <iostream>
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/redis.hpp>
#include <boost/redis/experimental/run.hpp>
#include <boost/redis/check_health.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include "../examples/common/common.hpp"
#include "../examples/start.hpp"
namespace net = boost::asio;
using namespace net::experimental::awaitable_operators;
@@ -21,7 +22,10 @@ using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::experimental::async_check_health;
using boost::redis::async_check_health;
using boost::redis::async_run;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using namespace std::chrono_literals;
// Push consumer
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -65,8 +69,7 @@ auto co_main(std::string host, std::string port) -> net::awaitable<void>
// The loop will reconnect on connection lost. To exit type Ctrl-C twice.
for (int i = 0; i < 10; ++i) {
co_await connect(conn, host, port);
co_await ((conn->async_run() || receiver(conn) || async_check_health(*conn) || periodic_task(conn)) &&
co_await ((async_run(*conn, host, port) || receiver(conn) || async_check_health(*conn) || periodic_task(conn)) &&
conn->async_exec(req));
conn->reset_stream();

76
tests/run.cpp Normal file
View File

@@ -0,0 +1,76 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#define BOOST_TEST_MODULE run
#include <boost/test/included/unit_test.hpp>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/redis.hpp>
#include <boost/redis/src.hpp>
namespace net = boost::asio;
using connection = boost::redis::connection;
using boost::redis::async_run;
using boost::system::error_code;
using namespace std::chrono_literals;
bool is_host_not_found(error_code ec)
{
if (ec == net::error::netdb_errors::host_not_found) return true;
if (ec == net::error::netdb_errors::host_not_found_try_again) return true;
return false;
}
BOOST_AUTO_TEST_CASE(resolve_bad_host)
{
net::io_context ioc;
connection conn{ioc};
async_run(conn, "Atibaia", "6379", 1000s, 1000s, [](auto ec){
BOOST_TEST(is_host_not_found(ec));
});
ioc.run();
}
BOOST_AUTO_TEST_CASE(resolve_with_timeout)
{
net::io_context ioc;
connection conn{ioc};
async_run(conn, "Atibaia", "6379", 1ms, 1ms, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::redis::error::resolve_timeout);
});
ioc.run();
}
BOOST_AUTO_TEST_CASE(connect_bad_port)
{
net::io_context ioc;
connection conn{ioc};
async_run(conn, "127.0.0.1", "1", 1000s, 10s, [](auto ec){
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::connection_refused);
});
ioc.run();
}
BOOST_AUTO_TEST_CASE(connect_with_timeout)
{
net::io_context ioc;
connection conn{ioc};
async_run(conn, "example.com", "1", 10s, 1ms, [](auto ec){
BOOST_CHECK_EQUAL(ec, boost::redis::error::connect_timeout);
});
ioc.run();
}