2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-27 07:12:08 +00:00

Compare commits

...

17 Commits

Author SHA1 Message Date
Marcelo Zimbres
c88fcfb9ed Adds more doc to node class. 2023-01-08 21:51:41 +01:00
Marcelo
a56bf982ab Merge pull request #41 from Hailios/remove_duplicate_cmake
remove duplicate line in cmake
2023-01-07 20:11:50 +01:00
Jakob Lövhall
5d0ed0e986 remove duplicate line in cmake 2023-01-07 16:58:50 +01:00
Marcelo Zimbres
15deaa637d Doc improvements. 2023-01-07 00:14:29 +01:00
Marcelo Zimbres
bb8ff90351 Fixes issue 39. 2023-01-06 17:38:10 +01:00
Marcelo Zimbres
7d4902369a Doc improvements and replaces async_main to co_main. 2023-01-05 23:37:55 +01:00
Marcelo Zimbres
607ca17a89 Improvements in the documentation. 2023-01-04 22:51:53 +01:00
Marcelo Zimbres
3849ba42fd Changes:
- Fix include header order.
- Removes default completion token where it is not needed.
- Replaces yield with BOOST_ macros.
2023-01-02 23:51:50 +01:00
Marcelo Zimbres
56bcdb7914 Improvements in the docs. 2022-12-31 15:58:31 +01:00
Marcelo Zimbres
73ad66eb93 Adds example that does not user awaitable ops. 2022-12-30 18:13:09 +01:00
Marcelo Zimbres
9cf00d6a23 Adds cpp17 async example. 2022-12-30 00:04:41 +01:00
Marcelo Zimbres
a00c9e7439 Doc improvements. 2022-12-27 21:21:43 +01:00
Marcelo Zimbres
0520791100 Renames request flag. 2022-12-27 18:46:27 +01:00
Marcelo Zimbres
14b376e36e Fixes cancelation of async_exec (2). 2022-12-26 11:02:13 +01:00
Marcelo Zimbres
4f9dcc7dc5 Fixes async_exec terminal cancellation. 2022-12-25 20:01:35 +01:00
Marcelo Zimbres
ad5dd8c30b Refactors the parser so it is not header-only. 2022-12-22 21:42:41 +01:00
Marcelo Zimbres
842f864689 Using doxygen-awesome css. 2022-12-19 21:40:44 +01:00
46 changed files with 3950 additions and 2970 deletions

View File

@@ -10,8 +10,8 @@ cmake_minimum_required(VERSION 3.14)
project(
Aedis
VERSION 1.4.0
DESCRIPTION "A redis client designed for performance and scalability"
VERSION 1.4.1
DESCRIPTION "A redis client library"
HOMEPAGE_URL "https://mzimbres.github.io/aedis"
LANGUAGES CXX
)
@@ -73,92 +73,108 @@ endif()
# Executables
#=======================================================================
add_executable(intro examples/intro.cpp)
target_link_libraries(intro common)
target_compile_features(intro PUBLIC cxx_std_20)
add_test(intro intro)
add_executable(cpp20_intro examples/cpp20_intro.cpp)
target_link_libraries(cpp20_intro common)
target_compile_features(cpp20_intro PUBLIC cxx_std_20)
add_test(cpp20_intro cpp20_intro)
if (MSVC)
target_compile_options(intro PRIVATE /bigobj)
target_compile_definitions(intro PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_intro PRIVATE /bigobj)
target_compile_definitions(cpp20_intro PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(intro_sync examples/intro_sync.cpp)
target_compile_features(intro_sync PUBLIC cxx_std_20)
add_test(intro_sync intro_sync)
add_test(intro_sync intro_sync)
add_executable(cpp20_intro_awaitable_ops examples/cpp20_intro_awaitable_ops.cpp)
target_link_libraries(cpp20_intro_awaitable_ops common)
target_compile_features(cpp20_intro_awaitable_ops PUBLIC cxx_std_20)
add_test(cpp20_intro_awaitable_ops cpp20_intro_awaitable_ops)
if (MSVC)
target_compile_options(intro_sync PRIVATE /bigobj)
target_compile_definitions(intro_sync PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_intro_awaitable_ops PRIVATE /bigobj)
target_compile_definitions(cpp20_intro_awaitable_ops PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(chat_room examples/chat_room.cpp)
target_compile_features(chat_room PUBLIC cxx_std_20)
target_link_libraries(chat_room common)
add_executable(cpp17_intro examples/cpp17_intro.cpp)
target_compile_features(cpp17_intro PUBLIC cxx_std_17)
add_test(cpp17_intro cpp17_intro)
if (MSVC)
target_compile_options(chat_room PRIVATE /bigobj)
target_compile_definitions(chat_room PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp17_intro PRIVATE /bigobj)
target_compile_definitions(cpp17_intro PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(containers examples/containers.cpp)
target_compile_features(containers PUBLIC cxx_std_20)
target_link_libraries(containers common)
add_test(containers containers)
add_executable(cpp17_intro_sync examples/cpp17_intro_sync.cpp)
target_compile_features(cpp17_intro_sync PUBLIC cxx_std_17)
add_test(cpp17_intro_sync cpp17_intro_sync)
if (MSVC)
target_compile_options(containers PRIVATE /bigobj)
target_compile_definitions(containers PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp17_intro_sync PRIVATE /bigobj)
target_compile_definitions(cpp17_intro_sync PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(echo_server examples/echo_server.cpp)
target_compile_features(echo_server PUBLIC cxx_std_20)
target_link_libraries(echo_server common)
add_executable(cpp20_chat_room examples/cpp20_chat_room.cpp)
target_compile_features(cpp20_chat_room PUBLIC cxx_std_20)
target_link_libraries(cpp20_chat_room common)
if (MSVC)
target_compile_options(echo_server PRIVATE /bigobj)
target_compile_definitions(echo_server PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_chat_room PRIVATE /bigobj)
target_compile_definitions(cpp20_chat_room PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(resolve_with_sentinel examples/resolve_with_sentinel.cpp)
target_compile_features(resolve_with_sentinel PUBLIC cxx_std_20)
target_link_libraries(resolve_with_sentinel common)
#add_test(resolve_with_sentinel resolve_with_sentinel)
add_executable(cpp20_containers examples/cpp20_containers.cpp)
target_compile_features(cpp20_containers PUBLIC cxx_std_20)
target_link_libraries(cpp20_containers common)
add_test(cpp20_containers cpp20_containers)
if (MSVC)
target_compile_options(resolve_with_sentinel PRIVATE /bigobj)
target_compile_definitions(resolve_with_sentinel PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_containers PRIVATE /bigobj)
target_compile_definitions(cpp20_containers PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(serialization examples/serialization.cpp)
target_compile_features(serialization PUBLIC cxx_std_20)
target_link_libraries(serialization common)
add_test(serialization serialization)
add_executable(cpp20_echo_server examples/cpp20_echo_server.cpp)
target_compile_features(cpp20_echo_server PUBLIC cxx_std_20)
target_link_libraries(cpp20_echo_server common)
if (MSVC)
target_compile_options(serialization PRIVATE /bigobj)
target_compile_definitions(serialization PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_echo_server PRIVATE /bigobj)
target_compile_definitions(cpp20_echo_server PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(subscriber examples/subscriber.cpp)
target_compile_features(subscriber PUBLIC cxx_std_20)
target_link_libraries(subscriber common)
add_executable(cpp20_resolve_with_sentinel examples/cpp20_resolve_with_sentinel.cpp)
target_compile_features(cpp20_resolve_with_sentinel PUBLIC cxx_std_20)
target_link_libraries(cpp20_resolve_with_sentinel common)
#add_test(cpp20_resolve_with_sentinel cpp20_resolve_with_sentinel)
if (MSVC)
target_compile_options(subscriber PRIVATE /bigobj)
target_compile_definitions(subscriber PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_resolve_with_sentinel PRIVATE /bigobj)
target_compile_definitions(cpp20_resolve_with_sentinel PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(intro_tls examples/intro_tls.cpp)
target_compile_features(intro_tls PUBLIC cxx_std_20)
add_test(intro_tls intro_tls)
target_link_libraries(intro_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(intro_tls common)
add_executable(cpp20_serialization examples/cpp20_serialization.cpp)
target_compile_features(cpp20_serialization PUBLIC cxx_std_20)
target_link_libraries(cpp20_serialization common)
add_test(cpp20_serialization cpp20_serialization)
if (MSVC)
target_compile_options(intro_tls PRIVATE /bigobj)
target_compile_definitions(intro_tls PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_serialization PRIVATE /bigobj)
target_compile_definitions(cpp20_serialization PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(low_level_async examples/low_level_async.cpp)
target_compile_features(low_level_async PUBLIC cxx_std_20)
add_test(low_level_async low_level_async)
target_link_libraries(low_level_async common)
add_executable(cpp20_subscriber examples/cpp20_subscriber.cpp)
target_compile_features(cpp20_subscriber PUBLIC cxx_std_20)
target_link_libraries(cpp20_subscriber common)
if (MSVC)
target_compile_options(low_level_async PRIVATE /bigobj)
target_compile_definitions(low_level_async PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp20_subscriber PRIVATE /bigobj)
target_compile_definitions(cpp20_subscriber PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(cpp20_intro_tls examples/cpp20_intro_tls.cpp)
target_compile_features(cpp20_intro_tls PUBLIC cxx_std_20)
add_test(cpp20_intro_tls cpp20_intro_tls)
target_link_libraries(cpp20_intro_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(cpp20_intro_tls common)
if (MSVC)
target_compile_options(cpp20_intro_tls PRIVATE /bigobj)
target_compile_definitions(cpp20_intro_tls PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(cpp20_low_level_async examples/cpp20_low_level_async.cpp)
target_compile_features(cpp20_low_level_async PUBLIC cxx_std_20)
add_test(cpp20_low_level_async cpp20_low_level_async)
target_link_libraries(cpp20_low_level_async common)
if (MSVC)
target_compile_options(cpp20_low_level_async PRIVATE /bigobj)
target_compile_definitions(cpp20_low_level_async PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(echo_server_client benchmarks/cpp/asio/echo_server_client.cpp)
@@ -175,12 +191,12 @@ if (MSVC)
target_compile_definitions(echo_server_direct PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(low_level_sync examples/low_level_sync.cpp)
target_compile_features(low_level_sync PUBLIC cxx_std_17)
add_test(low_level_sync low_level_sync)
add_executable(cpp17_low_level_sync examples/cpp17_low_level_sync.cpp)
target_compile_features(cpp17_low_level_sync PUBLIC cxx_std_17)
add_test(cpp17_low_level_sync cpp17_low_level_sync)
if (MSVC)
target_compile_options(low_level_sync PRIVATE /bigobj)
target_compile_definitions(low_level_sync PRIVATE _WIN32_WINNT=0x0601)
target_compile_options(cpp17_low_level_sync PRIVATE /bigobj)
target_compile_definitions(cpp17_low_level_sync PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_exec tests/conn_exec.cpp)
@@ -191,6 +207,14 @@ if (MSVC)
target_compile_definitions(test_conn_exec PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_exec_retry tests/conn_exec_retry.cpp)
target_compile_features(test_conn_exec_retry PUBLIC cxx_std_20)
add_test(test_conn_exec_retry test_conn_exec_retry)
if (MSVC)
target_compile_options(test_conn_exec_retry PRIVATE /bigobj)
target_compile_definitions(test_conn_exec_retry PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_push tests/conn_push.cpp)
target_compile_features(test_conn_push PUBLIC cxx_std_20)
add_test(test_conn_push test_conn_push)

366
README.md
View File

@@ -1,177 +1,244 @@
# Aedis
## Overview
Aedis is a [Redis](https://redis.io/) client library built on top of
[Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html)
[Boost.Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html)
that implements the latest version of the Redis communication
protocol
[RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md).
It makes communication with a Redis server easy by hiding most of
the low-level Asio-related code away from the user, which in the majority of
the cases will be concerned with only three library entities
It makes communication with a Redis server easy by hiding low-level
code away from the user, which, in the majority of the cases will be
concerned with only three library entities
* `aedis::connection`: A connection to the Redis server.
* `aedis::resp3::request`: A container of Redis commands.
* `aedis::adapt()`: Adapts data structures to receive responses.
* `aedis::connection`: A connection to the Redis server with
high-level functions to execute Redis commands, receive server
pushes and support for automatic command
[pipelines](https://redis.io/docs/manual/pipelining/).
* `aedis::resp3::request`: A container of Redis commands that supports
STL containers and user defined data types.
* `aedis::adapt()`: A function that adapts data structures to receive responses.
For example, the coroutine below uses a short-lived connection to read Redis
[hashes](https://redis.io/docs/data-types/hashes/)
in a `std::map` (see intro.cpp and containers.cpp)
In the next sections we will cover all those points in detail with
examples. The requirements for using Aedis are
* Boost 1.80 or greater.
* C++17 minimum.
* Redis 6 or higher (must support RESP3).
* Gcc (10, 11, 12), Clang (11, 13, 14) and Visual Studio (16 2019, 17 2022).
* Have basic-level knowledge about Redis and understand Asio and its asynchronous model.
Readers that are not familiar with Redis can learn more about
it on https://redis.io/docs/, in essence
> Redis is an open source (BSD licensed), in-memory data structure
> store used as a database, cache, message broker, and streaming
> engine. Redis provides data structures such as strings, hashes,
> lists, sets, sorted sets with range queries, bitmaps, hyperloglogs,
> geospatial indexes, and streams. Redis has built-in replication, Lua
> scripting, LRU eviction, transactions, and different levels of
> on-disk persistence, and provides high availability via Redis
> Sentinel and automatic partitioning with Redis Cluster.
<a name="connection"></a>
## Connection
Let us start with a simple application that uses a short-lived
connection to read Redis
[hashes](https://redis.io/docs/data-types/hashes/) in a `std::map`
```cpp
auto async_main() -> net::awaitable<void>
auto co_main() -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// From examples/common.hpp to avoid vebosity
co_await connect(conn, "127.0.0.1", "6379");
// A request contains multiple commands.
// A request can contain multiple commands.
resp3::request req;
req.push("HELLO", 3);
req.push("HGETALL", "hset-key");
req.push("QUIT");
// Responses as tuple elements.
std::tuple<aedis::ignore, std::map<std::string, std::string>, aedis::ignore> resp;
// The tuple elements will store the responses to each individual
// command. The responses to HELLO and QUIT are being ignored for
// simplicity.
std::tuple<ignore, std::map<std::string, std::string>, ignore> resp;
// Executes the request and reads the response.
// Executes the request. See below why we are using operator ||.
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
// Use the map from std::get<1>(resp) ...
}
```
The execution of `connection::async_exec` above is composed with
`connection::async_run` with the aid of the Asio awaitable operator ||
that ensures that one operation is cancelled as soon as the other
completes, these functions play the following roles
The example above uses the Asio awaitable `operator ||` to compose
`connection::async_exec` and `connection::async_run` in an
operation we can `co_await` on. It also provides cancelation of one of
the operations when the other completes. The role played by these
functions are
* `connection::async_exec`: Execute commands (i.e. write the request and reads the response).
* `connection::async_run`: Coordinate read and write operations and remains suspended until the connection is lost.
* `connection::async_exec`: Execute commands by queuing the request
for writing and wait for the response sent back by
Redis. Can be called from multiple places in your code concurrently.
* `connection::async_run`: Coordinate low-level read and write
operations. More specifically, it will hand IO control to
`async_exec` when a response arrives and to `async_receive` when a
server-push is received. It is also responsible for triggering writes of pending
requests.
Let us dig in.
The example above is also available in other programming styles for comparison
<a name="connection"></a>
## Connection
* cpp20_intro_awaitable_ops.cpp: The version shown above.
* cpp20_intro.cpp: Does not use awaitable operators.
* cpp20_intro_tls.cpp: Communicates over TLS.
* cpp17_intro.cpp: Uses callbacks and requires C++17.
* cpp17_intro_sync.cpp: Runs `async_run` in a separate thread and
performs synchronous calls to `async_exec`.
In general we will want to reuse the same connection for multiple
requests, we can do this with the example above by decoupling the
HELLO command and the call to `async_run` in a separate coroutine
For performance reasons we will usually want to perform multiple
requests with the same connection. We can do this in the example above
by letting `async_run` run detached in a separate coroutine, for
example (see cpp20_intro.cpp)
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
co_await connect(conn, "127.0.0.1", "6379");
resp3::request req;
req.push("HELLO", 3); // Upgrade to RESP3
// Notice we use && instead of || so async_run is not cancelled
// when the response to HELLO comes.
co_await (conn->async_run() && conn->async_exec(req));
co_await conn->async_run();
}
```
We can now let `run` run detached in the background while other
coroutines perform requests on the connection
```cpp
auto async_main() -> net::awaitable<void>
auto hello(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// Calls async_run detached.
net::co_spawn(ex, run(conn), net::detached)
// Here we can pass conn around to other coroutines so they can make requests.
...
}
```
With this separation, it is now easy to incorporate other operations
in our application, for example, to cancel the connection on `SIGINT`
and `SIGTERM` we can extend `run` as follows
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
co_await connect(conn, "127.0.0.1", "6379");
signal_set sig{ex, SIGINT, SIGTERM};
resp3::request req;
req.push("HELLO", 3);
co_await ((conn->async_run() || sig.async_wait()) && conn->async_exec(req));
co_await conn->async_exec(req);
}
```
Likewise we can incorporate support for server pushes, healthy checks and pubsub
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto ping(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
co_await connect(conn, "127.0.0.1", "6379");
signal_set sig{ex, SIGINT, SIGTERM};
resp3::request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel1", "channel2");
req.push("PING", "Hello world");
co_await ((conn->async_run() || sig.async_wait() || receiver(conn) || healthy_checker(conn))
&& conn->async_exec(req));
std::tuple<std::string> resp;
co_await conn->async_exec(req, adapt(resp));
// Use the response ...
}
```
The definition of `receiver` and `healthy_checker` above can be found
in subscriber.cpp. Adding a loop around `async_run` produces a simple
way to support reconnection _while there are pending operations on the connection_,
for example, to reconnect to the same address
auto quit(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
resp3::request req;
req.push("QUIT");
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
co_await conn->async_exec(req);
}
auto co_main() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
steady_timer timer{ex};
auto conn = std::make_shared<connection>(ex);
net::co_spawn(ex, run(conn), net::detached);
co_await hello(conn);
co_await ping(conn);
co_await quit(conn);
resp3::request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel1", "channel2");
// conn can be passed around to other coroutines that need to
// communicate with Redis. For example, sessions in a HTTP and
// Websocket server.
}
```
With this separation, it is now easy to incorporate other long-running
operations in our application, for example, the run coroutine below
adds signal handling and a healthy checker (see cpp20_echo_server.cpp)
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
signal_set sig{co_await net::this_coro::executor, SIGINT, SIGTERM};
co_await connect(conn, "127.0.0.1", "6379");
co_await (conn->async_run() || sig.async_wait() || healthy_checker(conn));
}
```
The definition of the `healthy_checker` used above can be found in common.cpp.
### Server pushes
Redis servers can also send a variety of pushes to the client, some of
them are
* [Pubsub](https://redis.io/docs/manual/pubsub/)
* [Keyspace notification](https://redis.io/docs/manual/keyspace-notifications/)
* [Client-side caching](https://redis.io/docs/manual/client-side-caching/)
The connection class supports server pushes by means of the
`aedis::connection::async_receive` function, the coroutine shows how
to used it
```cpp
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
using resp_type = std::vector<resp3::node<std::string>>;
for (resp_type resp;;) {
co_await conn->async_receive(adapt(resp));
// Use resp and clear the response for a new push.
resp.clear();
}
}
```
The `receiver` defined above can be run detached or incorporated
in the `run` function as we did for the `healthy_checker`
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
signal_set sig{co_await net::this_coro::executor, SIGINT, SIGTERM};
co_await connect(conn, "127.0.0.1", "6379");
co_await (conn->async_run() || sig.async_wait() || healthy_checker(conn) || receiver(conn));
}
```
### Reconnecting
Adding a loop around `async_run` produces a simple way to support
reconnection _while there are pending operations on the connection_,
for example, to reconnect to the same address (see cpp20_subscriber.cpp)
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
steady_timer timer{co_await net::this_coro::executor};
for (;;) {
co_await connect(conn, "127.0.0.1", "6379");
co_await ((conn->async_run() || healthy_checker(conn) || receiver(conn))
&& conn->async_exec(req));
co_await (conn->async_run() || healthy_checker(conn) || receiver(conn));
// Prepare the stream for a new connection.
conn->reset_stream();
// Waits one second before trying to reconnect.
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();
}
}
```
For failover with sentinels see `resolve_with_sentinel.cpp`. At
this point the reasons for why `async_run` was introduced in Aedis
might have become apparent to the reader
* Provide quick reaction to disconnections and hence faster failover.
* Support server pushes and requests in the same connection object, concurrently.
* Separate requests, handling of server pushes and reconnection operations.
This feature results in considerable simplification of backend code
and makes it easier to write failover-safe applications. For example,
a Websocket server might have a 10k sessions communicating with Redis at
the time the connection is lost (or maybe killed by the server admin
to force a failover). It would be concerning if each individual section
were to throw exceptions and handle error. With the pattern shown
above the only place that has to manage the error is the run function.
### Cancellation
Aedis supports both implicit and explicit cancellation of connection
operations. Explicit cancellation is supported by means of the
`aedis::connection::cancel` member function. Implicit cancellation,
like those that may happen when using Asio awaitable operators && and
|| will be discussed with more detail below.
```cpp
co_await (conn.async_run(...) && conn.async_exec(...))
```
* Provide a simple way to send HELLO and perform channel subscription.
`aedis::connection::cancel` member function. Implicit
terminal-cancellation, like those that happen when using Asio
awaitable `operator ||` will be discussed with more detail below.
```cpp
co_await (conn.async_run(...) || conn.async_exec(...))
@@ -186,16 +253,10 @@ co_await (conn.async_exec(...) || time.async_wait(...))
* Provides a way to limit how long the execution of a single request
should last.
* The cancellation will be ignored if the request has already
been written to the socket.
* WARNING: If the timer fires after the request has been sent but before the
response has been received, the connection will be closed.
* It is usually a better idea to have a healthy checker than adding
per request timeout, see subscriber.cpp for an example.
```cpp
co_await (conn.async_run(...) || time.async_wait(...))
```
* Sets a limit on how long the connection should live.
per request timeout, see cpp20_subscriber.cpp for an example.
```cpp
co_await (conn.async_exec(...) || conn.async_exec(...) || ... || conn.async_exec(...))
@@ -203,8 +264,8 @@ co_await (conn.async_exec(...) || conn.async_exec(...) || ... || conn.async_exec
* This works but is unnecessary. Unless the user has set
`aedis::resp3::request::config::coalesce` to `false`, and he
shouldn't, the connection will automatically merge the individual
requests into a single payload anyway.
usually shouldn't, the connection will automatically merge the
individual requests into a single payload.
<a name="requests"></a>
## Requests
@@ -218,6 +279,7 @@ Redis documentation they are called
std::list<std::string> list {...};
std::map<std::string, mystruct> map { ...};
// The request can contains multiple commands.
request req;
// Command with variable length of arguments.
@@ -244,10 +306,10 @@ with integer data types e.g. `int` and `std::string` out of the box.
To send your own data type define a `to_bulk` function like this
```cpp
// Example struct.
// User defined type.
struct mystruct {...};
// Serialize your data structure here.
// Serialize it in to_bulk.
void to_bulk(std::pmr::string& to, mystruct const& obj)
{
std::string dummy = "Dummy serializaiton string.";
@@ -266,7 +328,7 @@ std::map<std::string, mystruct> map {...};
req.push_range("HSET", "key", map);
```
Example serialization.cpp shows how store json strings in Redis.
Example cpp20_serialization.cpp shows how store json strings in Redis.
<a name="responses"></a>
@@ -281,7 +343,7 @@ reader is advised to read it carefully.
Aedis uses the following strategy to support Redis responses
* **Static**: For `aedis::resp3::request` whose sizes are known at compile time
std::tuple is supported.
`std::tuple` is supported.
* **Dynamic**: Otherwise use `std::vector<aedis::resp3::node<std::string>>`.
For example, below is a request with a compile time size
@@ -390,13 +452,13 @@ subset of the RESP3 specification.
### Pushes
Commands that have push response like
Commands that have no response like
* `"SUBSCRIBE"`
* `"PSUBSCRIBE"`
* `"UNSUBSCRIBE"`
must be **NOT** be included in the tuple. For example, the request below
must be **NOT** be included in the response tuple. For example, the request below
```cpp
request req;
@@ -465,7 +527,7 @@ std::tuple<
co_await conn->async_exec(req, adapt(resp));
```
For a complete example see containers.cpp.
For a complete example see cpp20_containers.cpp.
### Deserialization
@@ -541,26 +603,28 @@ from Redis with `HGETALL`, some of the options are
* `std::map<U, V>`: Efficient if you are storing serialized data. Avoids temporaries and requires `from_bulk` for `U` and `V`.
In addition to the above users can also use unordered versions of the
containers. The same reasoning also applies to sets e.g. `SMEMBERS`
containers. The same reasoning applies to sets e.g. `SMEMBERS`
and other data structures in general.
## Examples
These examples demonstrate what has been discussed so far.
The examples below show how to use the features discussed so far
* intro.cpp: The Aedis hello-world program. Sends one command and quits the connection.
* intro_tls.cpp: Same as intro.cpp but over TLS.
* intro_sync.cpp: Shows how to use the connection class synchronously.
* containers.cpp: Shows how to send and receive STL containers and how to use transactions.
* serialization.cpp: Shows how to serialize types using Boost.Json.
* resolve_with_sentinel.cpp: Shows how to resolve a master address using sentinels.
* subscriber.cpp: Shows how to implement pubsub with reconnection re-subscription.
* echo_server.cpp: A simple TCP echo server.
* chat_room.cpp: A command line chat built on Redis pubsub.
* low_level_sync.cpp: Sends a ping synchronously using the low-level API.
* low_level_async.cpp: Sends a ping asynchronously using the low-level API.
* cpp20_intro_awaitable_ops.cpp: The version shown above.
* cpp20_intro.cpp: Does not use awaitable operators.
* cpp20_intro_tls.cpp: Communicates over TLS.
* cpp20_containers.cpp: Shows how to send and receive STL containers and how to use transactions.
* cpp20_serialization.cpp: Shows how to serialize types using Boost.Json.
* cpp20_resolve_with_sentinel.cpp: Shows how to resolve a master address using sentinels.
* cpp20_subscriber.cpp: Shows how to implement pubsub with reconnection re-subscription.
* cpp20_echo_server.cpp: A simple TCP echo server.
* cpp20_chat_room.cpp: A command line chat built on Redis pubsub.
* cpp20_low_level_async.cpp: Sends a ping asynchronously using the low-level API.
* cpp17_low_level_sync.cpp: Sends a ping synchronously using the low-level API.
* cpp17_intro.cpp: Uses callbacks and requires C++17.
* cpp17_intro_sync.cpp: Runs `async_run` in a separate thread and performs synchronous calls to `async_exec`.
To avoid repetition code that is common to all examples has been
To avoid repetition code that is common to some examples has been
grouped in common.hpp. The main function used in some async examples
has been factored out in the main.cpp file.
@@ -667,10 +731,10 @@ stars, namely
### Aedis vs Redis-plus-plus
Before we start it is important to mentioning some of the things
Before we start it is important to mention some of the things
redis-plus-plus does not support
* The latest version of the communication protocol RESP3. Without it it is impossible to support some important Redis features like client side caching, among other things.
* The latest version of the communication protocol RESP3. Without that it is impossible to support some important Redis features like client side caching, among other things.
* Coroutines.
* Reading responses directly in user data structures to avoid creating temporaries.
* Error handling with support for error-code.
@@ -767,7 +831,7 @@ It is also not clear how are pipelines realised with this design
## Reference
* [High-Level](#high-level-api): Covers the topics discussed in this document.
* [Low-Level](#low-level-api): Covers low-level building blocks. Provided mostly for developers, most users won't need any information provided here.
* [Low-Level](#low-level-api): Covers low-level building blocks. Provided mostly for developers, users won't usually need any information provided here.
## Installation
@@ -781,25 +845,11 @@ library, so you can starting using it right away by adding the
```
in no more than one source file in your applications. To build the
examples and test cmake is supported, for example
examples and tests cmake is supported, for example
```cpp
BOOST_ROOT=/opt/boost_1_80_0 cmake --preset dev
```
The requirements for using Aedis are
- Boost 1.80 or greater.
- C++17 minimum.
- Redis 6 or higher (must support RESP3).
- Optionally also redis-cli and Redis Sentinel.
The following compilers are supported
- Gcc: 10, 11, 12.
- Clang: 11, 13, 14.
- Visual Studio 17 2022, Visual Studio 16 2019.
## Acknowledgement
Acknowledgement to people that helped shape Aedis
@@ -809,11 +859,13 @@ Acknowledgement to people that helped shape Aedis
* Petr Dannhofer ([Eddie-cz](https://github.com/Eddie-cz)): For helping me understand how the `AUTH` and `HELLO` command can influence each other.
* Mohammad Nejati ([ashtum](https://github.com/ashtum)): For pointing out scenarios where calls to `async_exec` should fail when the connection is lost.
* Klemens Morgenstern ([klemens-morgenstern](https://github.com/klemens-morgenstern)): For useful discussion about timeouts, cancellation, synchronous interfaces and general help with Asio.
* Vinnie Falco ([vinniefalco](https://github.com/vinniefalco)): For general suggestions about how to improve the code and the documentation.
## Changelog
### v1.4.0
### v1.4.0-1
* Renames `retry_on_connection_lost` to `cancel_if_unresponded`. (v1.4.1)
* Removes dependency on Boost.Hana, boost::string_view, Boost.Variant2 and Boost.Spirit.
* Fixes build and setup CI on windows.
@@ -989,7 +1041,7 @@ Acknowledgement to people that helped shape Aedis
* `connection::async_receive_event` is now being used to communicate
internal events to the user, such as resolve, connect, push etc. For
examples see subscriber.cpp and `connection::event`.
examples see cpp20_subscriber.cpp and `connection::event`.
* The `aedis` directory has been moved to `include` to look more
similar to Boost libraries. Users should now replace `-I/aedis-path`

View File

@@ -1285,7 +1285,7 @@ HTML_STYLESHEET =
# list). For an example see the documentation.
# This tag requires that the tag GENERATE_HTML is set to YES.
HTML_EXTRA_STYLESHEET = doc/aedis.css
HTML_EXTRA_STYLESHEET = doc/doxygen-awesome.css doc/doxygen-awesome-sidebar-only.css
# The HTML_EXTRA_FILES tag can be used to specify one or more extra images or
# other source files which should be copied to the HTML output directory. Note
@@ -1578,7 +1578,7 @@ ECLIPSE_DOC_ID = org.doxygen.Project
# The default value is: NO.
# This tag requires that the tag GENERATE_HTML is set to YES.
DISABLE_INDEX = YES
DISABLE_INDEX = NO
# The GENERATE_TREEVIEW tag is used to specify whether a tree-like index
# structure should be generated to display hierarchical information. If the tag

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,115 @@
/**
Doxygen Awesome
https://github.com/jothepro/doxygen-awesome-css
MIT License
Copyright (c) 2021 jothepro
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
html {
/* side nav width. MUST be = `TREEVIEW_WIDTH`.
* Make sure it is wide enough to contain the page title (logo + title + version)
*/
--side-nav-fixed-width: 335px;
--menu-display: none;
--top-height: 120px;
--toc-sticky-top: -25px;
--toc-max-height: calc(100vh - 2 * var(--spacing-medium) - 25px);
}
#projectname {
white-space: nowrap;
}
@media screen and (min-width: 768px) {
html {
--searchbar-background: var(--page-background-color);
}
#side-nav {
min-width: var(--side-nav-fixed-width);
max-width: var(--side-nav-fixed-width);
top: var(--top-height);
overflow: visible;
}
#nav-tree, #side-nav {
height: calc(100vh - var(--top-height)) !important;
}
#nav-tree {
padding: 0;
}
#top {
display: block;
border-bottom: none;
height: var(--top-height);
margin-bottom: calc(0px - var(--top-height));
max-width: var(--side-nav-fixed-width);
overflow: hidden;
background: var(--side-nav-background);
}
#main-nav {
float: left;
padding-right: 0;
}
.ui-resizable-handle {
cursor: default;
width: 1px !important;
box-shadow: 0 calc(-2 * var(--top-height)) 0 0 var(--separator-color);
}
#nav-path {
position: fixed;
right: 0;
left: var(--side-nav-fixed-width);
bottom: 0;
width: auto;
}
#doc-content {
height: calc(100vh - 31px) !important;
padding-bottom: calc(3 * var(--spacing-large));
padding-top: calc(var(--top-height) - 80px);
box-sizing: border-box;
margin-left: var(--side-nav-fixed-width) !important;
}
#MSearchBox {
width: calc(var(--side-nav-fixed-width) - calc(2 * var(--spacing-medium)));
}
#MSearchField {
width: calc(var(--side-nav-fixed-width) - calc(2 * var(--spacing-medium)) - 65px);
}
#MSearchResultsWindow {
left: var(--spacing-medium) !important;
right: auto;
}
}

2405
doc/doxygen-awesome.css Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -4,5 +4,4 @@
* accompanying file LICENSE.txt)
*/
#include <aedis.hpp>
#include <aedis/src.hpp>

View File

@@ -10,11 +10,19 @@
#include "common.hpp"
extern boost::asio::awaitable<void> async_main();
extern boost::asio::awaitable<void> co_main(std::string, std::string);
auto main() -> int
auto main(int argc, char * argv[]) -> int
{
return run(async_main());
std::string host = "127.0.0.1";
std::string port = "6379";
if (argc == 3) {
host = argv[1];
port = argv[2];
}
return run(co_main(host, port));
}
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)

101
examples/cpp17_intro.cpp Normal file
View File

@@ -0,0 +1,101 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <boost/asio.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::resp3::request;
using aedis::adapt;
using aedis::operation;
void log(boost::system::error_code const& ec, char const* prefix)
{
std::clog << prefix << ec.message() << std::endl;
}
auto main(int argc, char * argv[]) -> int
{
try {
std::string host = "127.0.0.1";
std::string port = "6379";
if (argc == 3) {
host = argv[1];
port = argv[2];
}
// The request
resp3::request req;
req.push("HELLO", 3);
req.push("PING", "Hello world");
req.push("QUIT");
// The response.
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
net::io_context ioc;
// IO objects.
net::ip::tcp::resolver resv{ioc};
aedis::connection conn{ioc};
// Resolve endpoints.
net::ip::tcp::resolver::results_type endpoints;
// async_run callback.
auto on_run = [](auto ec)
{
if (ec)
return log(ec, "on_run: ");
};
// async_exec callback.
auto on_exec = [&](auto ec, auto)
{
if (ec) {
conn.cancel(operation::run);
return log(ec, "on_exec: ");
}
std::cout << "PING: " << std::get<1>(resp) << std::endl;
};
// Connect callback.
auto on_connect = [&](auto ec, auto)
{
if (ec)
return log(ec, "on_connect: ");
conn.async_run(on_run);
conn.async_exec(req, adapt(resp), on_exec);
};
// Resolve callback.
auto on_resolve = [&](auto ec, auto const& addrs)
{
if (ec)
return log(ec, "on_resolve: ");
endpoints = addrs;
net::async_connect(conn.next_layer(), endpoints, on_connect);
};
resv.async_resolve(host, port, on_resolve);
ioc.run();
return 0;
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
return 1;
}

View File

@@ -31,15 +31,30 @@ auto exec(std::shared_ptr<connection> conn, resp3::request const& req, Adapter a
auto logger = [](auto const& ec)
{ std::clog << "Run: " << ec.message() << std::endl; };
int main()
auto main(int argc, char * argv[]) -> int
{
try {
std::string host = "127.0.0.1";
std::string port = "6379";
if (argc == 3) {
host = argv[1];
port = argv[2];
}
net::io_context ioc{1};
auto conn = std::make_shared<connection>(ioc);
// Resolves the address
net::ip::tcp::resolver resv{ioc};
auto const res = resv.resolve("127.0.0.1", "6379");
auto const res = resv.resolve(host, port);
// Connect to Redis
net::connect(conn->next_layer(), res);
// Starts a thread that will can io_context::run on which
// the connection will run.
std::thread t{[conn, &ioc]() {
conn->async_run(logger);
ioc.run();
@@ -51,6 +66,8 @@ int main()
req.push("QUIT");
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
// Executes commands synchronously.
exec(conn, req, adapt(resp));
std::cout << "Response: " << std::get<1>(resp) << std::endl;

View File

@@ -16,12 +16,20 @@ namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::adapter::adapt2;
int main()
auto main(int argc, char * argv[]) -> int
{
try {
std::string host = "127.0.0.1";
std::string port = "6379";
if (argc == 3) {
host = argv[1];
port = argv[2];
}
net::io_context ioc;
net::ip::tcp::resolver resv{ioc};
auto const res = resv.resolve("127.0.0.1", "6379");
auto const res = resv.resolve(host, port);
net::ip::tcp::socket socket{ioc};
net::connect(socket, res);

View File

@@ -47,7 +47,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
}
// Called from the main function (see main.cpp)
auto async_main() -> net::awaitable<void>
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
@@ -58,13 +58,13 @@ auto async_main() -> net::awaitable<void>
req.push("HELLO", 3);
req.push("SUBSCRIBE", "chat-channel");
co_await connect(conn, "127.0.0.1", "6379");
co_await connect(conn, host, port);
co_await ((conn->async_run() || publisher(stream, conn) || receiver(conn) ||
healthy_checker(conn) || sig.async_wait()) && conn->async_exec(req));
}
#else // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
auto async_main() -> net::awaitable<void>
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
std::cout << "Requires support for posix streams." << std::endl;
co_return;

View File

@@ -30,14 +30,15 @@ void print(std::vector<int> const& cont)
std::cout << "\n";
}
// Stores the content of some STL containers in Redis.
auto store() -> net::awaitable<void>
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// Resolves and connects (from examples/common.hpp to avoid vebosity)
co_await connect(conn, "127.0.0.1", "6379");
co_await connect(conn, host, port);
co_await conn->async_run();
}
// Stores the content of some STL containers in Redis.
auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
@@ -48,70 +49,68 @@ auto store() -> net::awaitable<void>
req.push("HELLO", 3);
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
req.push("QUIT");
co_await (conn->async_run() || conn->async_exec(req));
co_await conn->async_exec(req);
}
auto hgetall() -> net::awaitable<std::map<std::string, std::string>>
auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// From examples/common.hpp to avoid vebosity
co_await connect(conn, "127.0.0.1", "6379");
// A request contains multiple commands.
resp3::request req;
req.push("HELLO", 3);
req.push("HGETALL", "hset-key");
req.push("QUIT");
// Responses as tuple elements.
std::tuple<aedis::ignore, std::map<std::string, std::string>, aedis::ignore> resp;
std::tuple<aedis::ignore, std::map<std::string, std::string>> resp;
// Executes the request and reads the response.
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
co_return std::get<1>(resp);
co_await conn->async_exec(req, adapt(resp));
print(std::get<1>(resp));
}
// Retrieves in a transaction.
auto transaction() -> net::awaitable<void>
auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// Resolves and connects (from examples/common.hpp to avoid vebosity)
co_await connect(conn, "127.0.0.1", "6379");
resp3::request req;
req.push("HELLO", 3);
req.push("MULTI");
req.push("LRANGE", "rpush-key", 0, -1); // Retrieves
req.push("HGETALL", "hset-key"); // Retrieves
req.push("EXEC");
req.push("QUIT");
std::tuple<
aedis::ignore, // hello
aedis::ignore, // multi
aedis::ignore, // lrange
aedis::ignore, // hgetall
std::tuple<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>>, // exec
aedis::ignore // quit
std::tuple<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>> // exec
> resp;
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
co_await conn->async_exec(req, adapt(resp));
print(std::get<0>(std::get<4>(resp)).value());
print(std::get<1>(std::get<4>(resp)).value());
}
// Called from the main function (see main.cpp)
net::awaitable<void> async_main()
auto quit(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
co_await store();
co_await transaction();
auto const map = co_await hgetall();
print(map);
resp3::request req;
req.push("QUIT");
co_await conn->async_exec(req);
}
// Called from the main function (see main.cpp)
net::awaitable<void> co_main(std::string host, std::string port)
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
net::co_spawn(ex, run(conn, host, port), net::detached);
co_await store(conn);
co_await transaction(conn);
co_await hgetall(conn);
co_await quit(conn);
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -45,7 +45,7 @@ auto listener(std::shared_ptr<connection> conn) -> net::awaitable<void>
}
// Called from the main function (see main.cpp)
auto async_main() -> net::awaitable<void>
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
@@ -54,7 +54,7 @@ auto async_main() -> net::awaitable<void>
resp3::request req;
req.push("HELLO", 3);
co_await connect(conn, "127.0.0.1", "6379");
co_await connect(conn, host, port);
co_await ((conn->async_run() || listener(conn) || healthy_checker(conn) ||
sig.async_wait()) && conn->async_exec(req));
}

61
examples/cpp20_intro.cpp Normal file
View File

@@ -0,0 +1,61 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <aedis.hpp>
#include "common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::adapt;
using aedis::operation;
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
{
co_await connect(conn, host, port);
co_await conn->async_run();
}
auto hello(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
resp3::request req;
req.push("HELLO", 3);
co_await conn->async_exec(req);
}
auto ping(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
resp3::request req;
req.push("PING", "Hello world");
std::tuple<std::string> resp;
co_await conn->async_exec(req, adapt(resp));
std::cout << "PING: " << std::get<0>(resp) << std::endl;
}
auto quit(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
resp3::request req;
req.push("QUIT");
co_await conn->async_exec(req);
}
// Called from the main function (see main.cpp)
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
net::co_spawn(ex, run(conn, host, port), net::detached);
co_await hello(conn);
co_await ping(conn);
co_await quit(conn);
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -16,7 +16,7 @@ using namespace net::experimental::awaitable_operators;
using aedis::adapt;
// Called from the main function (see main.cpp)
auto async_main() -> net::awaitable<void>
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
resp3::request req;
req.push("HELLO", 3);
@@ -26,7 +26,7 @@ auto async_main() -> net::awaitable<void>
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
co_await connect(conn, "127.0.0.1", "6379");
co_await connect(conn, host, port);
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
std::cout << "PING: " << std::get<1>(resp) << std::endl;

View File

@@ -29,7 +29,7 @@ auto verify_certificate(bool, net::ssl::verify_context&) -> bool
return true;
}
net::awaitable<void> async_main()
net::awaitable<void> co_main(std::string, std::string)
{
resp3::request req;
req.push("HELLO", 3, "AUTH", "aedis", "aedis");

View File

@@ -17,12 +17,12 @@ using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>
using aedis::adapter::adapt2;
using net::ip::tcp;
auto async_main() -> net::awaitable<void>
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
resolver resv{ex};
auto const addrs = co_await resv.async_resolve("127.0.0.1", "6379");
auto const addrs = co_await resv.async_resolve(host, port);
tcp_socket socket{ex};
co_await net::async_connect(socket, addrs);

View File

@@ -49,14 +49,14 @@ auto resolve_master_address(std::vector<address> const& endpoints) -> net::await
co_return address{};
}
auto async_main() -> net::awaitable<void>
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
// A list of sentinel addresses from which only one is responsive
// to simulate sentinels that are down.
// A list of sentinel addresses from which only one is responsive.
// This simulates sentinels that are down.
std::vector<address> const endpoints
{ {"foo", "26379"}
, {"bar", "26379"}
, {"127.0.0.1", "26379"}
, {host, port}
};
auto const ep = co_await resolve_master_address(endpoints);

View File

@@ -86,7 +86,7 @@ void from_bulk(user& u, std::string_view sv, boost::system::error_code&)
u = value_to<user>(jv);
}
net::awaitable<void> async_main()
net::awaitable<void> co_main(std::string host, std::string port)
{
std::set<user> users
{{"Joao", "58", "Brazil"} , {"Serge", "60", "France"}};
@@ -101,7 +101,7 @@ net::awaitable<void> async_main()
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
co_await connect(conn, "127.0.0.1", "6379");
co_await connect(conn, host, port);
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
for (auto const& e: std::get<2>(resp))

View File

@@ -14,7 +14,6 @@
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using signal_set = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
using aedis::adapt;
@@ -37,18 +36,18 @@ using aedis::adapt;
// Receives pushes.
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
for (std::vector<resp3::node<std::string>> resp;;) {
using resp_type = std::vector<resp3::node<std::string>>;
for (resp_type resp;;) {
co_await conn->async_receive(adapt(resp));
std::cout << resp.at(1).value << " " << resp.at(2).value << " " << resp.at(3).value << std::endl;
resp.clear();
}
}
auto async_main() -> net::awaitable<void>
auto co_main(std::string host, std::string port) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
signal_set sig{ex, SIGINT, SIGTERM};
steady_timer timer{ex};
resp3::request req;
@@ -57,9 +56,8 @@ auto async_main() -> net::awaitable<void>
// The loop will reconnect on connection lost. To exit type Ctrl-C twice.
for (;;) {
co_await connect(conn, "127.0.0.1", "6379");
co_await ((conn->async_run() || healthy_checker(conn) || sig.async_wait() ||
receiver(conn)) && conn->async_exec(req));
co_await connect(conn, host, port);
co_await ((conn->async_run() || healthy_checker(conn) || receiver(conn)) && conn->async_exec(req));
conn->reset_stream();
timer.expires_after(std::chrono::seconds{1});

View File

@@ -7,17 +7,17 @@
#ifndef AEDIS_ADAPT_HPP
#define AEDIS_ADAPT_HPP
#include <tuple>
#include <limits>
#include <string_view>
#include <variant>
#include <aedis/resp3/node.hpp>
#include <aedis/adapter/adapt.hpp>
#include <aedis/adapter/detail/response_traits.hpp>
#include <boost/mp11.hpp>
#include <boost/system.hpp>
#include <aedis/resp3/node.hpp>
#include <aedis/adapter/adapt.hpp>
#include <aedis/adapter/detail/response_traits.hpp>
#include <tuple>
#include <limits>
#include <string_view>
#include <variant>
namespace aedis {

View File

@@ -7,6 +7,14 @@
#ifndef AEDIS_ADAPTER_ADAPTERS_HPP
#define AEDIS_ADAPTER_ADAPTERS_HPP
#include <aedis/error.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/node.hpp>
#include <boost/assert.hpp>
#include <set>
#include <optional>
#include <unordered_set>
@@ -21,14 +29,6 @@
#include <string_view>
#include <charconv>
#include <boost/assert.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/node.hpp>
namespace aedis::adapter::detail {
// Serialization.

View File

@@ -7,18 +7,18 @@
#ifndef AEDIS_ADAPTER_RESPONSE_TRAITS_HPP
#define AEDIS_ADAPTER_RESPONSE_TRAITS_HPP
#include <vector>
#include <tuple>
#include <string_view>
#include <variant>
#include <boost/mp11.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/adapter/detail/adapters.hpp>
#include <boost/mp11.hpp>
#include <vector>
#include <tuple>
#include <string_view>
#include <variant>
namespace aedis::adapter::detail {
using ignore = std::decay_t<decltype(std::ignore)>;

View File

@@ -7,11 +7,12 @@
#ifndef AEDIS_CONNECTION_HPP
#define AEDIS_CONNECTION_HPP
#include <chrono>
#include <memory>
#include <aedis/detail/connection_base.hpp>
#include <boost/asio/io_context.hpp>
#include <aedis/detail/connection_base.hpp>
#include <chrono>
#include <memory>
namespace aedis {
@@ -120,7 +121,7 @@ public:
* @param adapter Response adapter.
* @param token Asio completion token.
*
* For an example see echo_server.cpp. The completion token must
* For an example see cpp20_echo_server.cpp. The completion token must
* have the following signature
*
* @code
@@ -150,7 +151,7 @@ public:
* @param adapter The response adapter.
* @param token The Asio completion token.
*
* For an example see subscriber.cpp. The completion token must
* For an example see cpp20_subscriber.cpp. The completion token must
* have the following signature
*
* @code

View File

@@ -7,13 +7,10 @@
#ifndef AEDIS_CONNECTION_BASE_HPP
#define AEDIS_CONNECTION_BASE_HPP
#include <vector>
#include <queue>
#include <limits>
#include <chrono>
#include <memory>
#include <type_traits>
#include <memory_resource>
#include <aedis/adapt.hpp>
#include <aedis/operation.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/detail/connection_ops.hpp>
#include <boost/assert.hpp>
#include <boost/asio/ip/tcp.hpp>
@@ -22,10 +19,13 @@
#include <boost/asio/deferred.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <aedis/adapt.hpp>
#include <aedis/operation.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/detail/connection_ops.hpp>
#include <vector>
#include <queue>
#include <limits>
#include <chrono>
#include <memory>
#include <type_traits>
#include <memory_resource>
namespace aedis::detail {
@@ -44,7 +44,6 @@ public:
using executor_type = Executor;
using this_type = connection_base<Executor, Derived>;
explicit
connection_base(executor_type ex, std::pmr::memory_resource* resource)
: writer_timer_{ex}
, read_timer_{ex}
@@ -104,7 +103,6 @@ public:
return ret;
}
// Remove requests that have the flag cancel_if_not_sent_when_connection_lost set
auto cancel_on_conn_lost() -> std::size_t
{
// Must return false if the request should be removed.
@@ -113,7 +111,7 @@ public:
BOOST_ASSERT(ptr != nullptr);
if (ptr->is_written()) {
return ptr->get_request().get_config().retry_on_connection_lost;
return !ptr->get_request().get_config().cancel_if_unresponded;
} else {
return !ptr->get_request().get_config().cancel_on_connection_lost;
}
@@ -135,13 +133,8 @@ public:
return ret;
}
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_exec(
resp3::request const& req,
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
template <class Adapter, class CompletionToken>
auto async_exec(resp3::request const& req, Adapter adapter, CompletionToken token)
{
BOOST_ASSERT_MSG(req.size() <= adapter.get_supported_response_size(), "Request and adapter have incompatible sizes.");
@@ -151,12 +144,8 @@ public:
>(detail::exec_op<Derived, Adapter>{&derived(), &req, adapter}, token, writer_timer_);
}
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_receive(
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
template <class Adapter, class CompletionToken>
auto async_receive(Adapter adapter, CompletionToken token)
{
auto f = detail::make_adapter_wrapper(adapter);
@@ -248,8 +237,8 @@ private:
[[nodiscard]] auto get_request() const noexcept -> auto const&
{ return *req_; }
[[nodiscard]] auto get_action() const noexcept
{ return action_;}
[[nodiscard]] auto stop_requested() const noexcept
{ return action_ == action::stop;}
template <class CompletionToken>
auto async_wait(CompletionToken token)

View File

@@ -7,16 +7,6 @@
#ifndef AEDIS_CONNECTION_OPS_HPP
#define AEDIS_CONNECTION_OPS_HPP
#include <array>
#include <algorithm>
#include <string_view>
#include <boost/assert.hpp>
#include <boost/system.hpp>
#include <boost/asio/write.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <aedis/adapt.hpp>
#include <aedis/error.hpp>
#include <aedis/detail/guarded_operation.hpp>
@@ -26,7 +16,15 @@
#include <aedis/resp3/write.hpp>
#include <aedis/resp3/request.hpp>
#include <boost/asio/yield.hpp>
#include <boost/assert.hpp>
#include <boost/system.hpp>
#include <boost/asio/write.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <array>
#include <algorithm>
#include <string_view>
namespace aedis::detail {
@@ -45,7 +43,7 @@ struct exec_read_op {
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
BOOST_ASIO_CORO_REENTER (coro)
{
// Loop reading the responses to this request.
BOOST_ASSERT(!conn->reqs_.empty());
@@ -57,7 +55,7 @@ struct exec_read_op {
// to hand it to the push consumer. To do that we need
// some data in the read bufer.
if (conn->read_buffer_.empty()) {
yield
BOOST_ASIO_CORO_YIELD
boost::asio::async_read_until(
conn->next_layer(),
conn->make_dynamic_buffer(),
@@ -68,13 +66,14 @@ struct exec_read_op {
// If the next request is a push we have to handle it to
// the receive_op wait for it to be done and continue.
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) {
yield conn->guarded_op_.async_run(std::move(self));
BOOST_ASIO_CORO_YIELD
conn->guarded_op_.async_run(std::move(self));
AEDIS_CHECK_OP1(conn->cancel(operation::run););
continue;
}
//-----------------------------------
yield
BOOST_ASIO_CORO_YIELD
resp3::async_read(
conn->next_layer(),
conn->make_dynamic_buffer(adapter.get_max_read_size(index)),
@@ -116,7 +115,7 @@ struct exec_op {
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
BOOST_ASIO_CORO_REENTER (coro)
{
// Check whether the user wants to wait for the connection to
// be stablished.
@@ -128,10 +127,11 @@ struct exec_op {
conn->add_request_info(info);
EXEC_OP_WAIT:
yield info->async_wait(std::move(self));
BOOST_ASIO_CORO_YIELD
info->async_wait(std::move(self));
BOOST_ASSERT(ec == boost::asio::error::operation_aborted);
if (info->get_action() == Conn::req_info::action::stop) {
if (info->stop_requested()) {
// Don't have to call remove_request as it has already
// been by cancel(exec).
return self.complete(ec, 0);
@@ -139,9 +139,20 @@ EXEC_OP_WAIT:
if (is_cancelled(self)) {
if (info->is_written()) {
self.get_cancellation_state().clear();
goto EXEC_OP_WAIT; // Too late, can't cancel.
using c_t = boost::asio::cancellation_type;
auto const c = self.get_cancellation_state().cancelled();
if ((c & c_t::terminal) != c_t::none) {
// Cancellation requires closing the connection
// otherwise it stays in inconsistent state.
conn->cancel(operation::run);
return self.complete(ec, 0);
} else {
// Can't implement other cancelation types, ignoring.
self.get_cancellation_state().clear();
goto EXEC_OP_WAIT;
}
} else {
// Cancelation can be honored.
conn->remove_request(info);
self.complete(ec, 0);
return;
@@ -159,7 +170,7 @@ EXEC_OP_WAIT:
BOOST_ASSERT(!conn->reqs_.empty());
BOOST_ASSERT(conn->reqs_.front() != nullptr);
BOOST_ASSERT(conn->cmds_ != 0);
yield
BOOST_ASIO_CORO_YIELD
conn->async_exec_read(adapter, conn->reqs_.front()->get_number_of_commands(), std::move(self));
AEDIS_CHECK_OP1(;);
@@ -193,12 +204,12 @@ struct run_op {
, boost::system::error_code ec0 = {}
, boost::system::error_code ec1 = {})
{
reenter (coro)
BOOST_ASIO_CORO_REENTER (coro)
{
conn->write_buffer_.clear();
conn->cmds_ = 0;
yield
BOOST_ASIO_CORO_YIELD
boost::asio::experimental::make_parallel_group(
[this](auto token) { return conn->reader(token);},
[this](auto token) { return conn->writer(token);}
@@ -232,11 +243,11 @@ struct writer_op {
{
boost::ignore_unused(n);
reenter (coro) for (;;)
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
while (!conn->reqs_.empty() && conn->cmds_ == 0 && conn->write_buffer_.empty()) {
conn->coalesce_requests();
yield
BOOST_ASIO_CORO_YIELD
boost::asio::async_write(conn->next_layer(), boost::asio::buffer(conn->write_buffer_), std::move(self));
AEDIS_CHECK_OP0(conn->cancel(operation::run););
@@ -251,7 +262,8 @@ struct writer_op {
}
}
yield conn->writer_timer_.async_wait(std::move(self));
BOOST_ASIO_CORO_YIELD
conn->writer_timer_.async_wait(std::move(self));
if (!conn->is_open() || is_cancelled(self)) {
// Notice this is not an error of the op, stoping was
// requested from the outside, so we complete with
@@ -275,9 +287,9 @@ struct reader_op {
{
boost::ignore_unused(n);
reenter (coro) for (;;)
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
yield
BOOST_ASIO_CORO_YIELD
boost::asio::async_read_until(
conn->next_layer(),
conn->make_dynamic_buffer(),
@@ -311,13 +323,15 @@ struct reader_op {
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push
|| conn->reqs_.empty()
|| (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)) {
yield conn->guarded_op_.async_run(std::move(self));
BOOST_ASIO_CORO_YIELD
conn->guarded_op_.async_run(std::move(self));
} else {
BOOST_ASSERT(conn->cmds_ != 0);
BOOST_ASSERT(!conn->reqs_.empty());
BOOST_ASSERT(conn->reqs_.front()->get_number_of_commands() != 0);
conn->reqs_.front()->proceed();
yield conn->read_timer_.async_wait(std::move(self));
BOOST_ASIO_CORO_YIELD
conn->read_timer_.async_wait(std::move(self));
ec = {};
}
@@ -332,5 +346,4 @@ struct reader_op {
} // aedis::detail
#include <boost/asio/unyield.hpp>
#endif // AEDIS_CONNECTION_OPS_HPP

View File

@@ -8,7 +8,6 @@
#define AEDIS_DETAIL_GUARDED_OPERATION_HPP
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/yield.hpp>
namespace aedis::detail {
@@ -21,12 +20,14 @@ struct send_receive_op {
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro)
BOOST_ASIO_CORO_REENTER (coro)
{
yield channel->async_send(boost::system::error_code{}, 0, std::move(self));
BOOST_ASIO_CORO_YIELD
channel->async_send(boost::system::error_code{}, 0, std::move(self));
AEDIS_CHECK_OP0(;);
yield channel->async_send(boost::system::error_code{}, 0, std::move(self));
BOOST_ASIO_CORO_YIELD
channel->async_send(boost::system::error_code{}, 0, std::move(self));
AEDIS_CHECK_OP0(;);
self.complete({});
@@ -48,17 +49,20 @@ struct wait_op {
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
BOOST_ASIO_CORO_REENTER (coro)
{
yield channel->async_receive(std::move(self));
BOOST_ASIO_CORO_YIELD
channel->async_receive(std::move(self));
AEDIS_CHECK_OP1(;);
yield std::move(op)(std::move(self));
BOOST_ASIO_CORO_YIELD
std::move(op)(std::move(self));
AEDIS_CHECK_OP1(channel->cancel(););
res = n;
yield channel->async_receive(std::move(self));
BOOST_ASIO_CORO_YIELD
channel->async_receive(std::move(self));
AEDIS_CHECK_OP1(;);
self.complete({}, res);
@@ -67,14 +71,14 @@ struct wait_op {
}
};
template <class Executor = boost::asio::any_io_executor>
template <class Executor>
class guarded_operation {
public:
using executor_type = Executor;
guarded_operation(executor_type ex) : channel_{ex} {}
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_run(CompletionToken&& token = CompletionToken{})
template <class CompletionToken>
auto async_run(CompletionToken&& token)
{
return boost::asio::async_compose
< CompletionToken
@@ -82,8 +86,8 @@ public:
>(send_receive_op<executor_type>{&channel_}, token, channel_);
}
template <class Op, class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_wait(Op&& op, CompletionToken token = CompletionToken{})
template <class Op, class CompletionToken>
auto async_wait(Op&& op, CompletionToken token)
{
return boost::asio::async_compose
< CompletionToken
@@ -104,5 +108,4 @@ private:
} // aedis::detail
#include <boost/asio/unyield.hpp>
#endif // AEDIS_DETAIL_GUARDED_OPERATION_HPP

View File

@@ -5,6 +5,7 @@
*/
#include <aedis/error.hpp>
#include <boost/assert.hpp>
namespace aedis {
namespace detail {

View File

@@ -0,0 +1,156 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/error.hpp>
#include <boost/assert.hpp>
#include <charconv>
namespace aedis::resp3::detail {
void to_int(int_type& i, std::string_view sv, boost::system::error_code& ec)
{
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), i);
if (res.ec != std::errc())
ec = error::not_a_number;
}
parser::parser()
{
sizes_[0] = 2; // The sentinel must be more than 1.
}
auto
parser::consume(
char const* data,
std::size_t n,
boost::system::error_code& ec) -> std::pair<node_type, std::size_t>
{
node_type ret;
if (bulk_expected()) {
n = bulk_length_ + 2;
ret = {bulk_, 1, depth_, {data, bulk_length_}};
bulk_ = type::invalid;
--sizes_[depth_];
} else if (sizes_[depth_] != 0) {
auto const t = to_type(*data);
switch (t) {
case type::streamed_string_part:
{
to_int(bulk_length_ , std::string_view{data + 1, n - 3}, ec);
if (ec)
return std::make_pair(node_type{}, 0);
if (bulk_length_ == 0) {
ret = {type::streamed_string_part, 1, depth_, {}};
sizes_[depth_] = 0; // We are done.
bulk_ = type::invalid;
} else {
bulk_ = type::streamed_string_part;
}
} break;
case type::blob_error:
case type::verbatim_string:
case type::blob_string:
{
if (data[1] == '?') {
// NOTE: This can only be triggered with blob_string.
// Trick: A streamed string is read as an aggregate
// of infinite lenght. When the streaming is done
// the server is supposed to send a part with length
// 0.
sizes_[++depth_] = (std::numeric_limits<std::size_t>::max)();
ret = {type::streamed_string, 0, depth_, {}};
} else {
to_int(bulk_length_ , std::string_view{data + 1, n - 3} , ec);
if (ec)
return std::make_pair(node_type{}, 0);
bulk_ = t;
}
} break;
case type::boolean:
{
if (n == 3) {
ec = error::empty_field;
return std::make_pair(node_type{}, 0);
}
if (data[1] != 'f' && data[1] != 't') {
ec = error::unexpected_bool_value;
return std::make_pair(node_type{}, 0);
}
ret = {t, 1, depth_, {data + 1, n - 3}};
--sizes_[depth_];
} break;
case type::doublean:
case type::big_number:
case type::number:
{
if (n == 3) {
ec = error::empty_field;
return std::make_pair(node_type{}, 0);
}
ret = {t, 1, depth_, {data + 1, n - 3}};
--sizes_[depth_];
} break;
case type::simple_error:
case type::simple_string:
{
ret = {t, 1, depth_, {&data[1], n - 3}};
--sizes_[depth_];
} break;
case type::null:
{
ret = {type::null, 1, depth_, {}};
--sizes_[depth_];
} break;
case type::push:
case type::set:
case type::array:
case type::attribute:
case type::map:
{
int_type l = -1;
to_int(l, std::string_view{data + 1, n - 3}, ec);
if (ec)
return std::make_pair(node_type{}, 0);
ret = {t, l, depth_, {}};
if (l == 0) {
--sizes_[depth_];
} else {
if (depth_ == max_embedded_depth) {
ec = error::exceeeds_max_nested_depth;
return std::make_pair(node_type{}, 0);
}
++depth_;
sizes_[depth_] = l * element_multiplicity(t);
}
} break;
default:
{
ec = error::invalid_data_type;
return std::make_pair(node_type{}, 0);
}
}
}
while (sizes_[depth_] == 0) {
--depth_;
--sizes_[depth_];
}
return std::make_pair(ret, n);
}
} // aedis::resp3::detail

View File

@@ -7,38 +7,22 @@
#ifndef AEDIS_RESP3_PARSER_HPP
#define AEDIS_RESP3_PARSER_HPP
#include <aedis/resp3/node.hpp>
#include <array>
#include <limits>
#include <system_error>
#include <charconv>
#include <string_view>
#include <cstdint>
#include <boost/assert.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/node.hpp>
namespace aedis::resp3::detail {
using int_type = std::uint64_t;
inline
void to_int(int_type& i, std::string_view sv, boost::system::error_code& ec)
{
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), i);
if (res.ec != std::errc())
ec = error::not_a_number;
}
template <class ResponseAdapter>
class parser {
private:
using node_type = node<std::string_view>;
static constexpr std::size_t max_embedded_depth = 5;
ResponseAdapter adapter_;
// The current depth. Simple data types will have depth 0, whereas
// the elements of aggregates will have depth 1. Embedded types
// will have increasing depth.
@@ -57,176 +41,27 @@ private:
type bulk_ = type::invalid;
public:
explicit parser(ResponseAdapter adapter)
: adapter_{adapter}
{
sizes_[0] = 2; // The sentinel must be more than 1.
}
parser();
// Returns the number of bytes that have been consumed.
auto
consume(char const* data, std::size_t n, boost::system::error_code& ec) -> std::size_t
{
if (bulk_ != type::invalid) {
n = bulk_length_ + 2;
switch (bulk_) {
case type::streamed_string_part:
{
BOOST_ASSERT(bulk_length_ != 0);
adapter_({bulk_, 1, depth_, {data, bulk_length_}}, ec);
if (ec)
return 0;
} break;
default:
{
adapter_({bulk_, 1, depth_, {data, bulk_length_}}, ec);
if (ec)
return 0;
}
}
bulk_ = type::invalid;
--sizes_[depth_];
} else if (sizes_[depth_] != 0) {
auto const t = to_type(*data);
switch (t) {
case type::streamed_string_part:
{
to_int(bulk_length_ , std::string_view{data + 1, n - 3}, ec);
if (ec)
return 0;
if (bulk_length_ == 0) {
adapter_({type::streamed_string_part, 1, depth_, {}}, ec);
sizes_[depth_] = 0; // We are done.
} else {
bulk_ = type::streamed_string_part;
}
} break;
case type::blob_error:
case type::verbatim_string:
case type::blob_string:
{
if (data[1] == '?') {
// NOTE: This can only be triggered with blob_string.
// Trick: A streamed string is read as an aggregate
// of infinite lenght. When the streaming is done
// the server is supposed to send a part with length
// 0.
sizes_[++depth_] = (std::numeric_limits<std::size_t>::max)();
} else {
to_int(bulk_length_ , std::string_view{data + 1, n - 3} , ec);
if (ec)
return 0;
bulk_ = t;
}
} break;
case type::boolean:
{
if (n == 3) {
ec = error::empty_field;
return 0;
}
if (data[1] != 'f' && data[1] != 't') {
ec = error::unexpected_bool_value;
return 0;
}
adapter_({t, 1, depth_, {data + 1, n - 3}}, ec);
if (ec)
return 0;
--sizes_[depth_];
} break;
case type::doublean:
case type::big_number:
case type::number:
{
if (n == 3) {
ec = error::empty_field;
return 0;
}
adapter_({t, 1, depth_, {data + 1, n - 3}}, ec);
if (ec)
return 0;
--sizes_[depth_];
} break;
case type::simple_error:
case type::simple_string:
{
adapter_({t, 1, depth_, {&data[1], n - 3}}, ec);
if (ec)
return 0;
--sizes_[depth_];
} break;
case type::null:
{
adapter_({type::null, 1, depth_, {}}, ec);
if (ec)
return 0;
--sizes_[depth_];
} break;
case type::push:
case type::set:
case type::array:
case type::attribute:
case type::map:
{
int_type l = -1;
to_int(l, std::string_view{data + 1, n - 3}, ec);
if (ec)
return 0;
adapter_({t, l, depth_, {}}, ec);
if (ec)
return 0;
if (l == 0) {
--sizes_[depth_];
} else {
if (depth_ == max_embedded_depth) {
ec = error::exceeeds_max_nested_depth;
return 0;
}
++depth_;
sizes_[depth_] = l * element_multiplicity(t);
}
} break;
default:
{
ec = error::invalid_data_type;
return 0;
}
}
}
while (sizes_[depth_] == 0) {
--depth_;
--sizes_[depth_];
}
return n;
}
consume(
char const* data,
std::size_t n,
boost::system::error_code& ec) -> std::pair<node_type, std::size_t>;
// Returns true when the parser is done with the current message.
[[nodiscard]] auto done() const noexcept
{ return depth_ == 0 && bulk_ == type::invalid; }
// The bulk type expected in the next read. If none is expected returns
// type::invalid.
[[nodiscard]] auto bulk() const noexcept { return bulk_; }
// The bulk type expected in the next read. If none is expected
// returns type::invalid.
[[nodiscard]] auto bulk_expected() const noexcept -> bool
{ return bulk_ != type::invalid; }
// The length expected in the the next bulk.
[[nodiscard]] auto bulk_length() const noexcept { return bulk_length_; }
[[nodiscard]] auto bulk_length() const noexcept
{ return bulk_length_; }
};
} // detail::resp3::aedis

View File

@@ -7,15 +7,15 @@
#ifndef AEDIS_RESP3_READ_OPS_HPP
#define AEDIS_RESP3_READ_OPS_HPP
#include <string_view>
#include <aedis/resp3/detail/parser.hpp>
#include <boost/assert.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/core/ignore_unused.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <boost/asio/yield.hpp>
#include <string_view>
namespace aedis::detail
{
@@ -61,7 +61,8 @@ class parse_op {
private:
AsyncReadStream& stream_;
DynamicBuffer buf_;
parser<ResponseAdapter> parser_;
parser parser_;
ResponseAdapter adapter_;
std::size_t consumed_ = 0;
std::size_t buffer_size_ = 0;
boost::asio::coroutine coro_{};
@@ -70,7 +71,7 @@ public:
parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter)
: stream_ {stream}
, buf_ {std::move(buf)}
, parser_ {std::move(adapter)}
, adapter_ {std::move(adapter)}
{ }
template <class Self>
@@ -78,9 +79,9 @@ public:
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro_) for (;;) {
if (parser_.bulk() == type::invalid) {
yield
BOOST_ASIO_CORO_REENTER (coro_) for (;;) {
if (!parser_.bulk_expected()) {
BOOST_ASIO_CORO_YIELD
boost::asio::async_read_until(stream_, buf_, "\r\n", std::move(self));
AEDIS_CHECK_OP1(;);
} else {
@@ -95,7 +96,7 @@ public:
buffer_size_ = buf_.size();
buf_.grow(parser_.bulk_length() + 2 - buffer_size_);
yield
BOOST_ASIO_CORO_YIELD
boost::asio::async_read(
stream_,
buf_.data(buffer_size_, parser_.bulk_length() + 2 - buffer_size_),
@@ -108,14 +109,18 @@ public:
BOOST_ASSERT(buf_.size() >= n);
}
n = parser_.consume(static_cast<char const*>(buf_.data(0, n).data()), n, ec);
if (ec) {
self.complete(ec, 0);
return;
auto const res = parser_.consume(static_cast<char const*>(buf_.data(0, n).data()), n, ec);
if (ec)
return self.complete(ec, 0);
if (!parser_.bulk_expected()) {
adapter_(res.first, ec);
if (ec)
return self.complete(ec, 0);
}
buf_.consume(n);
consumed_ += n;
buf_.consume(res.second);
consumed_ += res.second;
if (parser_.done()) {
self.complete({}, consumed_);
return;
@@ -126,5 +131,4 @@ public:
} // aedis::resp3::detail
#include <boost/asio/unyield.hpp>
#endif // AEDIS_RESP3_READ_OPS_HPP

View File

@@ -4,12 +4,13 @@
* accompanying file LICENSE.txt)
*/
#include <string_view>
#include <aedis/resp3/request.hpp>
#include <string_view>
namespace aedis::resp3::detail {
auto has_push_response(std::string_view cmd) -> bool
auto has_response(std::string_view cmd) -> bool
{
if (cmd == "SUBSCRIBE") return true;
if (cmd == "PSUBSCRIBE") return true;

View File

@@ -4,8 +4,8 @@
* accompanying file LICENSE.txt)
*/
#include <boost/assert.hpp>
#include <aedis/resp3/type.hpp>
#include <boost/assert.hpp>
namespace aedis::resp3 {
@@ -27,6 +27,7 @@ auto to_string(type t) -> char const*
case type::blob_error: return "blob_error";
case type::verbatim_string: return "verbatim_string";
case type::blob_string: return "blob_string";
case type::streamed_string: return "streamed_string";
case type::streamed_string_part: return "streamed_string_part";
default: return "invalid";
}

View File

@@ -14,11 +14,33 @@ namespace aedis::resp3 {
/** \brief A node in the response tree.
* \ingroup high-level-api
*
* Redis responses are the pre-order view of the response tree (see
* https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR).
* RESP3 can contain recursive data structures: A map of sets of
* vector of etc. As it is parsed each element is passed to user
* callbacks (push parser), the `aedis::adapt` function. The signature of this
* callback is `f(resp3::node<std::string_view)`. This class is called a node
* because it can be seen as the element of the response tree. It
* is a template so that users can use it with owing strings e.g.
* `std::string` or `boost::static_string` etc. if they decide to use a node as
* response type, for example, to read a non-aggregate data-type use
*
* \remark Any Redis response can be received in an array of nodes,
* for example \c std::vector<node<std::string>>.
* ```cpp
* resp3::node<std::string> resp;
* co_await conn->async_exec(req, adapt(resp));
* ```
*
* for an aggregate use instead
*
* ```cpp
* std::vector<resp3::node<std::string>> resp; co_await
* conn->async_exec(req, adapt(resp));
* ```
*
* The vector will contain the
* [pre-order](https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR)
* view of the response tree. Any Redis response can be received in
* an array of nodes as shown above.
*
* \tparam String A `std::string`-like type.
*/
template <class String>
struct node {

View File

@@ -59,11 +59,11 @@ read(
ResponseAdapter adapter,
boost::system::error_code& ec) -> std::size_t
{
detail::parser<ResponseAdapter> p {adapter};
detail::parser p;
std::size_t n = 0;
std::size_t consumed = 0;
do {
if (p.bulk() == type::invalid) {
if (!p.bulk_expected()) {
n = boost::asio::read_until(stream, buf, "\r\n", ec);
if (ec)
return 0;
@@ -81,12 +81,18 @@ read(
}
auto const* data = static_cast<char const*>(buf.data(0, n).data());
n = p.consume(data, n, ec);
auto const res = p.consume(data, n, ec);
if (ec)
return 0;
buf.consume(n);
consumed += n;
if (!p.bulk_expected()) {
adapter(res.first, ec);
if (ec)
return 0;
}
buf.consume(res.second);
consumed += res.second;
} while (!p.done());
return consumed;

View File

@@ -7,10 +7,11 @@
#ifndef AEDIS_RESP3_REQUEST_HPP
#define AEDIS_RESP3_REQUEST_HPP
#include <aedis/resp3/type.hpp>
#include <string>
#include <tuple>
#include <memory_resource>
#include <aedis/resp3/type.hpp>
// NOTE: Consider detecting tuples in the type in the parameter pack
// to calculate the header size correctly.
@@ -62,7 +63,7 @@ void to_bulk(Request& to, T n)
namespace detail {
auto has_push_response(std::string_view cmd) -> bool;
auto has_response(std::string_view cmd) -> bool;
template <class T>
struct add_bulk_impl {
@@ -169,35 +170,37 @@ class request {
public:
/// Request configuration options.
struct config {
/** \brief Setting it to true will cause
* `aedis::connection::async_exec` to complete with error if the
/** \brief If `true`
* `aedis::connection::async_exec` will complete with error if the
* connection is lost. Affects only requests that haven't been
* sent yet.
*/
bool cancel_on_connection_lost = true;
/** \brief If true the request will be coalesced with other
/** \brief If `true` the request will be coalesced with other
* requests, see https://redis.io/topics/pipelining. Otherwise
* the request is sent individually.
*/
bool coalesce = true;
/** \brief If true, the request will complete with error if the
* call happens before the connection with Redis was established.
/** \brief If `true` the request will complete with
* aedis::error::not_connected if `async_exec` is called before
* the connection with Redis was established.
*/
bool cancel_if_not_connected = false;
/** \brief If true `aedis::connection::async_exec` will not
* cancel this request if the connection is lost. Affects only
* requests that have been written to the socket but remained
* unresponded when `aedis::connection::async_run` completed.
/** \brief If `false` `aedis::connection::async_exec` will not
* automatically cancel this request if the connection is lost.
* Affects only requests that have been written to the socket
* but remained unresponded when `aedis::connection::async_run`
* completed.
*/
bool retry_on_connection_lost = false;
bool cancel_if_unresponded = true;
/** \brief If this request has a HELLO command and this flag is
* true, the `aedis::connection` will move it to the front of
/** \brief If this request has a `HELLO` command and this flag is
* `true`, the `aedis::connection` will move it to the front of
* the queue of awaiting requests. This makes it possible to
* send HELLO and authenticate before other commands are sent.
* send `HELLO` and authenticate before other commands are sent.
*/
bool hello_with_priority = true;
};
@@ -208,7 +211,7 @@ public:
* \param resource Memory resource.
*/
explicit
request(config cfg = config{true, true, false, false, true},
request(config cfg = config{true, true, false, true, true},
std::pmr::memory_resource* resource = std::pmr::get_default_resource())
: cfg_{cfg}, payload_(resource) {}
@@ -384,7 +387,7 @@ public:
private:
void check_cmd(std::string_view cmd)
{
if (!detail::has_push_response(cmd))
if (!detail::has_response(cmd))
++commands_;
if (cmd == "HELLO")

View File

@@ -50,6 +50,8 @@ enum class type
/// Simple
blob_string,
/// Simple
streamed_string,
/// Simple
streamed_string_part,
/// Invalid
invalid

View File

@@ -7,3 +7,4 @@
#include <aedis/impl/error.ipp>
#include <aedis/resp3/impl/request.ipp>
#include <aedis/resp3/impl/type.ipp>
#include <aedis/resp3/detail/impl/parser.ipp>

View File

@@ -7,11 +7,12 @@
#ifndef AEDIS_SSL_CONNECTION_HPP
#define AEDIS_SSL_CONNECTION_HPP
#include <chrono>
#include <memory>
#include <aedis/detail/connection_base.hpp>
#include <boost/asio/io_context.hpp>
#include <aedis/detail/connection_base.hpp>
#include <chrono>
#include <memory>
namespace aedis::ssl {

View File

@@ -122,123 +122,3 @@ BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected)
ioc.run();
}
// TODO: This test is broken.
BOOST_AUTO_TEST_CASE(request_retry_false)
{
resp3::request req0;
req0.get_config().coalesce = false;
req0.get_config().cancel_on_connection_lost = true;
req0.push("HELLO", 3);
resp3::request req1;
req1.get_config().coalesce = true;
req1.get_config().cancel_on_connection_lost = true;
req1.push("BLPOP", "any", 0);
resp3::request req2;
req2.get_config().coalesce = true;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().retry_on_connection_lost = false;
req2.push("PING");
net::io_context ioc;
connection conn{ioc};
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});
st.async_wait([&](auto){
// Cancels the request before receiving the response. This
// should cause the second request to complete with error
// although it has cancel_on_connection_lost = false.
conn.cancel(aedis::operation::run);
});
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req0, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_exec(req1, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req2, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_run([](auto ec){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
ioc.run();
}
BOOST_AUTO_TEST_CASE(request_retry_true)
{
resp3::request req0;
req0.get_config().coalesce = false;
req0.get_config().cancel_on_connection_lost = true;
req0.push("HELLO", 3);
resp3::request req1;
req1.get_config().coalesce = true;
req1.get_config().cancel_on_connection_lost = true;
req1.push("BLPOP", "any", 0);
resp3::request req2;
req2.get_config().coalesce = true;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().retry_on_connection_lost = true;
req2.push("PING");
resp3::request req3;
req3.get_config().coalesce = true;
req3.get_config().cancel_on_connection_lost = true;
req3.get_config().retry_on_connection_lost = false;
req3.push("QUIT");
net::io_context ioc;
connection conn{ioc};
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});
st.async_wait([&](auto){
// Cancels the request before receiving the response. This
// should cause the second request to complete with error
// although it has cancel_on_connection_lost = false.
conn.cancel(aedis::operation::run);
});
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req0, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_exec(req1, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req2, adapt(), [&](auto ec, auto){
BOOST_TEST(!ec);
conn.async_exec(req3, adapt(), [&](auto ec, auto){
BOOST_TEST(!ec);
});
});
conn.async_run([&](auto ec){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
conn.reset_stream();
net::connect(conn.next_layer(), endpoints);
conn.async_run([&](auto ec){
std::cout << ec.message() << std::endl;
BOOST_TEST(!ec);
});
});
ioc.run();
}

View File

@@ -110,7 +110,7 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>
// Will be cancelled before it is written.
resp3::request req2;
req2.get_config().coalesce = false;
//req2.get_config().cancel_on_connection_lost = true;
req2.get_config().cancel_on_connection_lost = true;
req2.push("PING");
net::steady_timer st{ex};
@@ -123,11 +123,9 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>
st.async_wait(redir(ec3))
);
BOOST_TEST(!ec1);
BOOST_CHECK_EQUAL(ec1, net::error::basic_errors::operation_aborted);
BOOST_CHECK_EQUAL(ec2, net::error::basic_errors::operation_aborted);
BOOST_TEST(!ec3);
conn->cancel(operation::run);
}
auto cancel_of_req_written_on_run_canceled() -> net::awaitable<void>
@@ -138,7 +136,7 @@ auto cancel_of_req_written_on_run_canceled() -> net::awaitable<void>
resp3::request req1;
req1.get_config().cancel_on_connection_lost = true;
req1.get_config().retry_on_connection_lost = false;
req1.get_config().cancel_if_unresponded = true;
req1.push("BLPOP", "any", 0);
auto ex = co_await net::this_coro::executor;

148
tests/conn_exec_retry.cpp Normal file
View File

@@ -0,0 +1,148 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <boost/asio.hpp>
#include <boost/system/errc.hpp>
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using error_code = boost::system::error_code;
using connection = aedis::connection;
using aedis::adapt;
BOOST_AUTO_TEST_CASE(request_retry_false)
{
resp3::request req0;
req0.get_config().coalesce = false;
req0.get_config().cancel_on_connection_lost = true;
req0.push("HELLO", 3);
resp3::request req1;
req1.get_config().coalesce = true;
req1.get_config().cancel_on_connection_lost = true;
req1.push("BLPOP", "any", 0);
resp3::request req2;
req2.get_config().coalesce = true;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().cancel_if_unresponded = true;
req2.push("PING");
net::io_context ioc;
connection conn{ioc};
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});
st.async_wait([&](auto){
// Cancels the request before receiving the response. This
// should cause the third request to complete with error
// although it has cancel_on_connection_lost = false. The reason
// being is has already been written so
// cancel_on_connection_lost does not apply.
conn.cancel(aedis::operation::run);
});
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req0, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_exec(req1, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req2, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_run([](auto ec){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
ioc.run();
}
BOOST_AUTO_TEST_CASE(request_retry_true)
{
resp3::request req0;
req0.get_config().coalesce = false;
req0.get_config().cancel_on_connection_lost = true;
req0.push("HELLO", 3);
resp3::request req1;
req1.get_config().coalesce = true;
req1.get_config().cancel_on_connection_lost = true;
req1.push("BLPOP", "any", 0);
resp3::request req2;
req2.get_config().coalesce = true;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().cancel_if_unresponded = false;
req2.push("PING");
resp3::request req3;
req3.get_config().coalesce = true;
req3.get_config().cancel_on_connection_lost = true;
req3.get_config().cancel_if_unresponded = true;
req3.push("QUIT");
net::io_context ioc;
connection conn{ioc};
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});
st.async_wait([&](auto){
// Cancels the request before receiving the response. This
// should cause the thrid request to not complete with error
// since it has cancel_if_unresponded = true and cancellation commes
// after it was written.
conn.cancel(aedis::operation::run);
});
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req0, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_exec(req1, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req2, adapt(), [&](auto ec, auto){
BOOST_TEST(!ec);
conn.async_exec(req3, adapt(), [&](auto ec, auto){
BOOST_TEST(!ec);
});
});
conn.async_run([&](auto ec){
// The first cacellation.
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
conn.reset_stream();
// Reconnects and runs again to test req3.
net::connect(conn.next_layer(), endpoints);
conn.async_run([&](auto ec){
std::cout << ec.message() << std::endl;
BOOST_TEST(!ec);
});
});
ioc.run();
}

View File

@@ -73,7 +73,7 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
resp3::request req1;
req1.get_config().cancel_if_not_connected = false;
req1.get_config().cancel_on_connection_lost = true;
req1.get_config().retry_on_connection_lost = false;
req1.get_config().cancel_if_unresponded = true;
req1.push("HELLO", 3);
req1.push("BLPOP", "any", 0);
@@ -92,7 +92,7 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
resp3::request req2;
req2.get_config().cancel_if_not_connected = false;
req2.get_config().cancel_on_connection_lost = true;
req2.get_config().retry_on_connection_lost= false;
req2.get_config().cancel_if_unresponded= true;
req2.push("HELLO", 3);
req2.push("QUIT");

View File

@@ -168,7 +168,7 @@ BOOST_AUTO_TEST_CASE(reset_before_run_completes)
ioc.run();
}
using slave_operation = aedis::detail::guarded_operation<>;
using slave_operation = aedis::detail::guarded_operation<net::any_io_executor>;
auto master(std::shared_ptr<slave_operation> op) -> net::awaitable<void>
{

File diff suppressed because it is too large Load Diff