mirror of
https://github.com/boostorg/redis.git
synced 2026-01-27 07:12:08 +00:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c88fcfb9ed | ||
|
|
a56bf982ab | ||
|
|
5d0ed0e986 | ||
|
|
15deaa637d | ||
|
|
bb8ff90351 | ||
|
|
7d4902369a | ||
|
|
607ca17a89 | ||
|
|
3849ba42fd | ||
|
|
56bcdb7914 | ||
|
|
73ad66eb93 | ||
|
|
9cf00d6a23 | ||
|
|
a00c9e7439 | ||
|
|
0520791100 | ||
|
|
14b376e36e | ||
|
|
4f9dcc7dc5 | ||
|
|
ad5dd8c30b | ||
|
|
842f864689 |
154
CMakeLists.txt
154
CMakeLists.txt
@@ -10,8 +10,8 @@ cmake_minimum_required(VERSION 3.14)
|
||||
|
||||
project(
|
||||
Aedis
|
||||
VERSION 1.4.0
|
||||
DESCRIPTION "A redis client designed for performance and scalability"
|
||||
VERSION 1.4.1
|
||||
DESCRIPTION "A redis client library"
|
||||
HOMEPAGE_URL "https://mzimbres.github.io/aedis"
|
||||
LANGUAGES CXX
|
||||
)
|
||||
@@ -73,92 +73,108 @@ endif()
|
||||
# Executables
|
||||
#=======================================================================
|
||||
|
||||
add_executable(intro examples/intro.cpp)
|
||||
target_link_libraries(intro common)
|
||||
target_compile_features(intro PUBLIC cxx_std_20)
|
||||
add_test(intro intro)
|
||||
add_executable(cpp20_intro examples/cpp20_intro.cpp)
|
||||
target_link_libraries(cpp20_intro common)
|
||||
target_compile_features(cpp20_intro PUBLIC cxx_std_20)
|
||||
add_test(cpp20_intro cpp20_intro)
|
||||
if (MSVC)
|
||||
target_compile_options(intro PRIVATE /bigobj)
|
||||
target_compile_definitions(intro PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_intro PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_intro PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(intro_sync examples/intro_sync.cpp)
|
||||
target_compile_features(intro_sync PUBLIC cxx_std_20)
|
||||
add_test(intro_sync intro_sync)
|
||||
add_test(intro_sync intro_sync)
|
||||
add_executable(cpp20_intro_awaitable_ops examples/cpp20_intro_awaitable_ops.cpp)
|
||||
target_link_libraries(cpp20_intro_awaitable_ops common)
|
||||
target_compile_features(cpp20_intro_awaitable_ops PUBLIC cxx_std_20)
|
||||
add_test(cpp20_intro_awaitable_ops cpp20_intro_awaitable_ops)
|
||||
if (MSVC)
|
||||
target_compile_options(intro_sync PRIVATE /bigobj)
|
||||
target_compile_definitions(intro_sync PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_intro_awaitable_ops PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_intro_awaitable_ops PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(chat_room examples/chat_room.cpp)
|
||||
target_compile_features(chat_room PUBLIC cxx_std_20)
|
||||
target_link_libraries(chat_room common)
|
||||
add_executable(cpp17_intro examples/cpp17_intro.cpp)
|
||||
target_compile_features(cpp17_intro PUBLIC cxx_std_17)
|
||||
add_test(cpp17_intro cpp17_intro)
|
||||
if (MSVC)
|
||||
target_compile_options(chat_room PRIVATE /bigobj)
|
||||
target_compile_definitions(chat_room PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp17_intro PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp17_intro PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(containers examples/containers.cpp)
|
||||
target_compile_features(containers PUBLIC cxx_std_20)
|
||||
target_link_libraries(containers common)
|
||||
add_test(containers containers)
|
||||
add_executable(cpp17_intro_sync examples/cpp17_intro_sync.cpp)
|
||||
target_compile_features(cpp17_intro_sync PUBLIC cxx_std_17)
|
||||
add_test(cpp17_intro_sync cpp17_intro_sync)
|
||||
if (MSVC)
|
||||
target_compile_options(containers PRIVATE /bigobj)
|
||||
target_compile_definitions(containers PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp17_intro_sync PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp17_intro_sync PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(echo_server examples/echo_server.cpp)
|
||||
target_compile_features(echo_server PUBLIC cxx_std_20)
|
||||
target_link_libraries(echo_server common)
|
||||
add_executable(cpp20_chat_room examples/cpp20_chat_room.cpp)
|
||||
target_compile_features(cpp20_chat_room PUBLIC cxx_std_20)
|
||||
target_link_libraries(cpp20_chat_room common)
|
||||
if (MSVC)
|
||||
target_compile_options(echo_server PRIVATE /bigobj)
|
||||
target_compile_definitions(echo_server PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_chat_room PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_chat_room PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(resolve_with_sentinel examples/resolve_with_sentinel.cpp)
|
||||
target_compile_features(resolve_with_sentinel PUBLIC cxx_std_20)
|
||||
target_link_libraries(resolve_with_sentinel common)
|
||||
#add_test(resolve_with_sentinel resolve_with_sentinel)
|
||||
add_executable(cpp20_containers examples/cpp20_containers.cpp)
|
||||
target_compile_features(cpp20_containers PUBLIC cxx_std_20)
|
||||
target_link_libraries(cpp20_containers common)
|
||||
add_test(cpp20_containers cpp20_containers)
|
||||
if (MSVC)
|
||||
target_compile_options(resolve_with_sentinel PRIVATE /bigobj)
|
||||
target_compile_definitions(resolve_with_sentinel PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_containers PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_containers PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(serialization examples/serialization.cpp)
|
||||
target_compile_features(serialization PUBLIC cxx_std_20)
|
||||
target_link_libraries(serialization common)
|
||||
add_test(serialization serialization)
|
||||
add_executable(cpp20_echo_server examples/cpp20_echo_server.cpp)
|
||||
target_compile_features(cpp20_echo_server PUBLIC cxx_std_20)
|
||||
target_link_libraries(cpp20_echo_server common)
|
||||
if (MSVC)
|
||||
target_compile_options(serialization PRIVATE /bigobj)
|
||||
target_compile_definitions(serialization PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_echo_server PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_echo_server PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(subscriber examples/subscriber.cpp)
|
||||
target_compile_features(subscriber PUBLIC cxx_std_20)
|
||||
target_link_libraries(subscriber common)
|
||||
add_executable(cpp20_resolve_with_sentinel examples/cpp20_resolve_with_sentinel.cpp)
|
||||
target_compile_features(cpp20_resolve_with_sentinel PUBLIC cxx_std_20)
|
||||
target_link_libraries(cpp20_resolve_with_sentinel common)
|
||||
#add_test(cpp20_resolve_with_sentinel cpp20_resolve_with_sentinel)
|
||||
if (MSVC)
|
||||
target_compile_options(subscriber PRIVATE /bigobj)
|
||||
target_compile_definitions(subscriber PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_resolve_with_sentinel PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_resolve_with_sentinel PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(intro_tls examples/intro_tls.cpp)
|
||||
target_compile_features(intro_tls PUBLIC cxx_std_20)
|
||||
add_test(intro_tls intro_tls)
|
||||
target_link_libraries(intro_tls OpenSSL::Crypto OpenSSL::SSL)
|
||||
target_link_libraries(intro_tls common)
|
||||
add_executable(cpp20_serialization examples/cpp20_serialization.cpp)
|
||||
target_compile_features(cpp20_serialization PUBLIC cxx_std_20)
|
||||
target_link_libraries(cpp20_serialization common)
|
||||
add_test(cpp20_serialization cpp20_serialization)
|
||||
if (MSVC)
|
||||
target_compile_options(intro_tls PRIVATE /bigobj)
|
||||
target_compile_definitions(intro_tls PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_serialization PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_serialization PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(low_level_async examples/low_level_async.cpp)
|
||||
target_compile_features(low_level_async PUBLIC cxx_std_20)
|
||||
add_test(low_level_async low_level_async)
|
||||
target_link_libraries(low_level_async common)
|
||||
add_executable(cpp20_subscriber examples/cpp20_subscriber.cpp)
|
||||
target_compile_features(cpp20_subscriber PUBLIC cxx_std_20)
|
||||
target_link_libraries(cpp20_subscriber common)
|
||||
if (MSVC)
|
||||
target_compile_options(low_level_async PRIVATE /bigobj)
|
||||
target_compile_definitions(low_level_async PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp20_subscriber PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_subscriber PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(cpp20_intro_tls examples/cpp20_intro_tls.cpp)
|
||||
target_compile_features(cpp20_intro_tls PUBLIC cxx_std_20)
|
||||
add_test(cpp20_intro_tls cpp20_intro_tls)
|
||||
target_link_libraries(cpp20_intro_tls OpenSSL::Crypto OpenSSL::SSL)
|
||||
target_link_libraries(cpp20_intro_tls common)
|
||||
if (MSVC)
|
||||
target_compile_options(cpp20_intro_tls PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_intro_tls PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(cpp20_low_level_async examples/cpp20_low_level_async.cpp)
|
||||
target_compile_features(cpp20_low_level_async PUBLIC cxx_std_20)
|
||||
add_test(cpp20_low_level_async cpp20_low_level_async)
|
||||
target_link_libraries(cpp20_low_level_async common)
|
||||
if (MSVC)
|
||||
target_compile_options(cpp20_low_level_async PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp20_low_level_async PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(echo_server_client benchmarks/cpp/asio/echo_server_client.cpp)
|
||||
@@ -175,12 +191,12 @@ if (MSVC)
|
||||
target_compile_definitions(echo_server_direct PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(low_level_sync examples/low_level_sync.cpp)
|
||||
target_compile_features(low_level_sync PUBLIC cxx_std_17)
|
||||
add_test(low_level_sync low_level_sync)
|
||||
add_executable(cpp17_low_level_sync examples/cpp17_low_level_sync.cpp)
|
||||
target_compile_features(cpp17_low_level_sync PUBLIC cxx_std_17)
|
||||
add_test(cpp17_low_level_sync cpp17_low_level_sync)
|
||||
if (MSVC)
|
||||
target_compile_options(low_level_sync PRIVATE /bigobj)
|
||||
target_compile_definitions(low_level_sync PRIVATE _WIN32_WINNT=0x0601)
|
||||
target_compile_options(cpp17_low_level_sync PRIVATE /bigobj)
|
||||
target_compile_definitions(cpp17_low_level_sync PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(test_conn_exec tests/conn_exec.cpp)
|
||||
@@ -191,6 +207,14 @@ if (MSVC)
|
||||
target_compile_definitions(test_conn_exec PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(test_conn_exec_retry tests/conn_exec_retry.cpp)
|
||||
target_compile_features(test_conn_exec_retry PUBLIC cxx_std_20)
|
||||
add_test(test_conn_exec_retry test_conn_exec_retry)
|
||||
if (MSVC)
|
||||
target_compile_options(test_conn_exec_retry PRIVATE /bigobj)
|
||||
target_compile_definitions(test_conn_exec_retry PRIVATE _WIN32_WINNT=0x0601)
|
||||
endif()
|
||||
|
||||
add_executable(test_conn_push tests/conn_push.cpp)
|
||||
target_compile_features(test_conn_push PUBLIC cxx_std_20)
|
||||
add_test(test_conn_push test_conn_push)
|
||||
|
||||
366
README.md
366
README.md
@@ -1,177 +1,244 @@
|
||||
|
||||
# Aedis
|
||||
|
||||
## Overview
|
||||
|
||||
Aedis is a [Redis](https://redis.io/) client library built on top of
|
||||
[Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html)
|
||||
[Boost.Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html)
|
||||
that implements the latest version of the Redis communication
|
||||
protocol
|
||||
[RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md).
|
||||
It makes communication with a Redis server easy by hiding most of
|
||||
the low-level Asio-related code away from the user, which in the majority of
|
||||
the cases will be concerned with only three library entities
|
||||
It makes communication with a Redis server easy by hiding low-level
|
||||
code away from the user, which, in the majority of the cases will be
|
||||
concerned with only three library entities
|
||||
|
||||
* `aedis::connection`: A connection to the Redis server.
|
||||
* `aedis::resp3::request`: A container of Redis commands.
|
||||
* `aedis::adapt()`: Adapts data structures to receive responses.
|
||||
* `aedis::connection`: A connection to the Redis server with
|
||||
high-level functions to execute Redis commands, receive server
|
||||
pushes and support for automatic command
|
||||
[pipelines](https://redis.io/docs/manual/pipelining/).
|
||||
* `aedis::resp3::request`: A container of Redis commands that supports
|
||||
STL containers and user defined data types.
|
||||
* `aedis::adapt()`: A function that adapts data structures to receive responses.
|
||||
|
||||
For example, the coroutine below uses a short-lived connection to read Redis
|
||||
[hashes](https://redis.io/docs/data-types/hashes/)
|
||||
in a `std::map` (see intro.cpp and containers.cpp)
|
||||
In the next sections we will cover all those points in detail with
|
||||
examples. The requirements for using Aedis are
|
||||
|
||||
* Boost 1.80 or greater.
|
||||
* C++17 minimum.
|
||||
* Redis 6 or higher (must support RESP3).
|
||||
* Gcc (10, 11, 12), Clang (11, 13, 14) and Visual Studio (16 2019, 17 2022).
|
||||
* Have basic-level knowledge about Redis and understand Asio and its asynchronous model.
|
||||
|
||||
Readers that are not familiar with Redis can learn more about
|
||||
it on https://redis.io/docs/, in essence
|
||||
|
||||
> Redis is an open source (BSD licensed), in-memory data structure
|
||||
> store used as a database, cache, message broker, and streaming
|
||||
> engine. Redis provides data structures such as strings, hashes,
|
||||
> lists, sets, sorted sets with range queries, bitmaps, hyperloglogs,
|
||||
> geospatial indexes, and streams. Redis has built-in replication, Lua
|
||||
> scripting, LRU eviction, transactions, and different levels of
|
||||
> on-disk persistence, and provides high availability via Redis
|
||||
> Sentinel and automatic partitioning with Redis Cluster.
|
||||
|
||||
<a name="connection"></a>
|
||||
## Connection
|
||||
|
||||
Let us start with a simple application that uses a short-lived
|
||||
connection to read Redis
|
||||
[hashes](https://redis.io/docs/data-types/hashes/) in a `std::map`
|
||||
|
||||
```cpp
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main() -> net::awaitable<void>
|
||||
{
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
|
||||
// From examples/common.hpp to avoid vebosity
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
|
||||
// A request contains multiple commands.
|
||||
// A request can contain multiple commands.
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("HGETALL", "hset-key");
|
||||
req.push("QUIT");
|
||||
|
||||
// Responses as tuple elements.
|
||||
std::tuple<aedis::ignore, std::map<std::string, std::string>, aedis::ignore> resp;
|
||||
// The tuple elements will store the responses to each individual
|
||||
// command. The responses to HELLO and QUIT are being ignored for
|
||||
// simplicity.
|
||||
std::tuple<ignore, std::map<std::string, std::string>, ignore> resp;
|
||||
|
||||
// Executes the request and reads the response.
|
||||
// Executes the request. See below why we are using operator ||.
|
||||
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
|
||||
|
||||
// Use the map from std::get<1>(resp) ...
|
||||
}
|
||||
```
|
||||
|
||||
The execution of `connection::async_exec` above is composed with
|
||||
`connection::async_run` with the aid of the Asio awaitable operator ||
|
||||
that ensures that one operation is cancelled as soon as the other
|
||||
completes, these functions play the following roles
|
||||
The example above uses the Asio awaitable `operator ||` to compose
|
||||
`connection::async_exec` and `connection::async_run` in an
|
||||
operation we can `co_await` on. It also provides cancelation of one of
|
||||
the operations when the other completes. The role played by these
|
||||
functions are
|
||||
|
||||
* `connection::async_exec`: Execute commands (i.e. write the request and reads the response).
|
||||
* `connection::async_run`: Coordinate read and write operations and remains suspended until the connection is lost.
|
||||
* `connection::async_exec`: Execute commands by queuing the request
|
||||
for writing and wait for the response sent back by
|
||||
Redis. Can be called from multiple places in your code concurrently.
|
||||
* `connection::async_run`: Coordinate low-level read and write
|
||||
operations. More specifically, it will hand IO control to
|
||||
`async_exec` when a response arrives and to `async_receive` when a
|
||||
server-push is received. It is also responsible for triggering writes of pending
|
||||
requests.
|
||||
|
||||
Let us dig in.
|
||||
The example above is also available in other programming styles for comparison
|
||||
|
||||
<a name="connection"></a>
|
||||
## Connection
|
||||
* cpp20_intro_awaitable_ops.cpp: The version shown above.
|
||||
* cpp20_intro.cpp: Does not use awaitable operators.
|
||||
* cpp20_intro_tls.cpp: Communicates over TLS.
|
||||
* cpp17_intro.cpp: Uses callbacks and requires C++17.
|
||||
* cpp17_intro_sync.cpp: Runs `async_run` in a separate thread and
|
||||
performs synchronous calls to `async_exec`.
|
||||
|
||||
In general we will want to reuse the same connection for multiple
|
||||
requests, we can do this with the example above by decoupling the
|
||||
HELLO command and the call to `async_run` in a separate coroutine
|
||||
For performance reasons we will usually want to perform multiple
|
||||
requests with the same connection. We can do this in the example above
|
||||
by letting `async_run` run detached in a separate coroutine, for
|
||||
example (see cpp20_intro.cpp)
|
||||
|
||||
```cpp
|
||||
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3); // Upgrade to RESP3
|
||||
|
||||
// Notice we use && instead of || so async_run is not cancelled
|
||||
// when the response to HELLO comes.
|
||||
co_await (conn->async_run() && conn->async_exec(req));
|
||||
co_await conn->async_run();
|
||||
}
|
||||
```
|
||||
We can now let `run` run detached in the background while other
|
||||
coroutines perform requests on the connection
|
||||
|
||||
```cpp
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto hello(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
|
||||
// Calls async_run detached.
|
||||
net::co_spawn(ex, run(conn), net::detached)
|
||||
|
||||
// Here we can pass conn around to other coroutines so they can make requests.
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
With this separation, it is now easy to incorporate other operations
|
||||
in our application, for example, to cancel the connection on `SIGINT`
|
||||
and `SIGTERM` we can extend `run` as follows
|
||||
|
||||
```cpp
|
||||
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
signal_set sig{ex, SIGINT, SIGTERM};
|
||||
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
|
||||
co_await ((conn->async_run() || sig.async_wait()) && conn->async_exec(req));
|
||||
co_await conn->async_exec(req);
|
||||
}
|
||||
```
|
||||
|
||||
Likewise we can incorporate support for server pushes, healthy checks and pubsub
|
||||
|
||||
```cpp
|
||||
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
auto ping(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
signal_set sig{ex, SIGINT, SIGTERM};
|
||||
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("SUBSCRIBE", "channel1", "channel2");
|
||||
req.push("PING", "Hello world");
|
||||
|
||||
co_await ((conn->async_run() || sig.async_wait() || receiver(conn) || healthy_checker(conn))
|
||||
&& conn->async_exec(req));
|
||||
std::tuple<std::string> resp;
|
||||
co_await conn->async_exec(req, adapt(resp));
|
||||
// Use the response ...
|
||||
}
|
||||
```
|
||||
|
||||
The definition of `receiver` and `healthy_checker` above can be found
|
||||
in subscriber.cpp. Adding a loop around `async_run` produces a simple
|
||||
way to support reconnection _while there are pending operations on the connection_,
|
||||
for example, to reconnect to the same address
|
||||
auto quit(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
resp3::request req;
|
||||
req.push("QUIT");
|
||||
|
||||
```cpp
|
||||
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
co_await conn->async_exec(req);
|
||||
}
|
||||
|
||||
auto co_main() -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
steady_timer timer{ex};
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
net::co_spawn(ex, run(conn), net::detached);
|
||||
co_await hello(conn);
|
||||
co_await ping(conn);
|
||||
co_await quit(conn);
|
||||
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("SUBSCRIBE", "channel1", "channel2");
|
||||
// conn can be passed around to other coroutines that need to
|
||||
// communicate with Redis. For example, sessions in a HTTP and
|
||||
// Websocket server.
|
||||
}
|
||||
```
|
||||
|
||||
With this separation, it is now easy to incorporate other long-running
|
||||
operations in our application, for example, the run coroutine below
|
||||
adds signal handling and a healthy checker (see cpp20_echo_server.cpp)
|
||||
|
||||
```cpp
|
||||
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
signal_set sig{co_await net::this_coro::executor, SIGINT, SIGTERM};
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await (conn->async_run() || sig.async_wait() || healthy_checker(conn));
|
||||
}
|
||||
```
|
||||
|
||||
The definition of the `healthy_checker` used above can be found in common.cpp.
|
||||
|
||||
### Server pushes
|
||||
|
||||
Redis servers can also send a variety of pushes to the client, some of
|
||||
them are
|
||||
|
||||
* [Pubsub](https://redis.io/docs/manual/pubsub/)
|
||||
* [Keyspace notification](https://redis.io/docs/manual/keyspace-notifications/)
|
||||
* [Client-side caching](https://redis.io/docs/manual/client-side-caching/)
|
||||
|
||||
The connection class supports server pushes by means of the
|
||||
`aedis::connection::async_receive` function, the coroutine shows how
|
||||
to used it
|
||||
|
||||
```cpp
|
||||
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
using resp_type = std::vector<resp3::node<std::string>>;
|
||||
for (resp_type resp;;) {
|
||||
co_await conn->async_receive(adapt(resp));
|
||||
// Use resp and clear the response for a new push.
|
||||
resp.clear();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `receiver` defined above can be run detached or incorporated
|
||||
in the `run` function as we did for the `healthy_checker`
|
||||
|
||||
```cpp
|
||||
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
signal_set sig{co_await net::this_coro::executor, SIGINT, SIGTERM};
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await (conn->async_run() || sig.async_wait() || healthy_checker(conn) || receiver(conn));
|
||||
}
|
||||
```
|
||||
|
||||
### Reconnecting
|
||||
|
||||
Adding a loop around `async_run` produces a simple way to support
|
||||
reconnection _while there are pending operations on the connection_,
|
||||
for example, to reconnect to the same address (see cpp20_subscriber.cpp)
|
||||
|
||||
```cpp
|
||||
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
steady_timer timer{co_await net::this_coro::executor};
|
||||
|
||||
for (;;) {
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await ((conn->async_run() || healthy_checker(conn) || receiver(conn))
|
||||
&& conn->async_exec(req));
|
||||
co_await (conn->async_run() || healthy_checker(conn) || receiver(conn));
|
||||
|
||||
// Prepare the stream for a new connection.
|
||||
conn->reset_stream();
|
||||
|
||||
// Waits one second before trying to reconnect.
|
||||
timer.expires_after(std::chrono::seconds{1});
|
||||
co_await timer.async_wait();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
For failover with sentinels see `resolve_with_sentinel.cpp`. At
|
||||
this point the reasons for why `async_run` was introduced in Aedis
|
||||
might have become apparent to the reader
|
||||
|
||||
* Provide quick reaction to disconnections and hence faster failover.
|
||||
* Support server pushes and requests in the same connection object, concurrently.
|
||||
* Separate requests, handling of server pushes and reconnection operations.
|
||||
This feature results in considerable simplification of backend code
|
||||
and makes it easier to write failover-safe applications. For example,
|
||||
a Websocket server might have a 10k sessions communicating with Redis at
|
||||
the time the connection is lost (or maybe killed by the server admin
|
||||
to force a failover). It would be concerning if each individual section
|
||||
were to throw exceptions and handle error. With the pattern shown
|
||||
above the only place that has to manage the error is the run function.
|
||||
|
||||
### Cancellation
|
||||
|
||||
Aedis supports both implicit and explicit cancellation of connection
|
||||
operations. Explicit cancellation is supported by means of the
|
||||
`aedis::connection::cancel` member function. Implicit cancellation,
|
||||
like those that may happen when using Asio awaitable operators && and
|
||||
|| will be discussed with more detail below.
|
||||
|
||||
```cpp
|
||||
co_await (conn.async_run(...) && conn.async_exec(...))
|
||||
```
|
||||
|
||||
* Provide a simple way to send HELLO and perform channel subscription.
|
||||
`aedis::connection::cancel` member function. Implicit
|
||||
terminal-cancellation, like those that happen when using Asio
|
||||
awaitable `operator ||` will be discussed with more detail below.
|
||||
|
||||
```cpp
|
||||
co_await (conn.async_run(...) || conn.async_exec(...))
|
||||
@@ -186,16 +253,10 @@ co_await (conn.async_exec(...) || time.async_wait(...))
|
||||
|
||||
* Provides a way to limit how long the execution of a single request
|
||||
should last.
|
||||
* The cancellation will be ignored if the request has already
|
||||
been written to the socket.
|
||||
* WARNING: If the timer fires after the request has been sent but before the
|
||||
response has been received, the connection will be closed.
|
||||
* It is usually a better idea to have a healthy checker than adding
|
||||
per request timeout, see subscriber.cpp for an example.
|
||||
|
||||
```cpp
|
||||
co_await (conn.async_run(...) || time.async_wait(...))
|
||||
```
|
||||
|
||||
* Sets a limit on how long the connection should live.
|
||||
per request timeout, see cpp20_subscriber.cpp for an example.
|
||||
|
||||
```cpp
|
||||
co_await (conn.async_exec(...) || conn.async_exec(...) || ... || conn.async_exec(...))
|
||||
@@ -203,8 +264,8 @@ co_await (conn.async_exec(...) || conn.async_exec(...) || ... || conn.async_exec
|
||||
|
||||
* This works but is unnecessary. Unless the user has set
|
||||
`aedis::resp3::request::config::coalesce` to `false`, and he
|
||||
shouldn't, the connection will automatically merge the individual
|
||||
requests into a single payload anyway.
|
||||
usually shouldn't, the connection will automatically merge the
|
||||
individual requests into a single payload.
|
||||
|
||||
<a name="requests"></a>
|
||||
## Requests
|
||||
@@ -218,6 +279,7 @@ Redis documentation they are called
|
||||
std::list<std::string> list {...};
|
||||
std::map<std::string, mystruct> map { ...};
|
||||
|
||||
// The request can contains multiple commands.
|
||||
request req;
|
||||
|
||||
// Command with variable length of arguments.
|
||||
@@ -244,10 +306,10 @@ with integer data types e.g. `int` and `std::string` out of the box.
|
||||
To send your own data type define a `to_bulk` function like this
|
||||
|
||||
```cpp
|
||||
// Example struct.
|
||||
// User defined type.
|
||||
struct mystruct {...};
|
||||
|
||||
// Serialize your data structure here.
|
||||
// Serialize it in to_bulk.
|
||||
void to_bulk(std::pmr::string& to, mystruct const& obj)
|
||||
{
|
||||
std::string dummy = "Dummy serializaiton string.";
|
||||
@@ -266,7 +328,7 @@ std::map<std::string, mystruct> map {...};
|
||||
req.push_range("HSET", "key", map);
|
||||
```
|
||||
|
||||
Example serialization.cpp shows how store json strings in Redis.
|
||||
Example cpp20_serialization.cpp shows how store json strings in Redis.
|
||||
|
||||
<a name="responses"></a>
|
||||
|
||||
@@ -281,7 +343,7 @@ reader is advised to read it carefully.
|
||||
Aedis uses the following strategy to support Redis responses
|
||||
|
||||
* **Static**: For `aedis::resp3::request` whose sizes are known at compile time
|
||||
std::tuple is supported.
|
||||
`std::tuple` is supported.
|
||||
* **Dynamic**: Otherwise use `std::vector<aedis::resp3::node<std::string>>`.
|
||||
|
||||
For example, below is a request with a compile time size
|
||||
@@ -390,13 +452,13 @@ subset of the RESP3 specification.
|
||||
|
||||
### Pushes
|
||||
|
||||
Commands that have push response like
|
||||
Commands that have no response like
|
||||
|
||||
* `"SUBSCRIBE"`
|
||||
* `"PSUBSCRIBE"`
|
||||
* `"UNSUBSCRIBE"`
|
||||
|
||||
must be **NOT** be included in the tuple. For example, the request below
|
||||
must be **NOT** be included in the response tuple. For example, the request below
|
||||
|
||||
```cpp
|
||||
request req;
|
||||
@@ -465,7 +527,7 @@ std::tuple<
|
||||
co_await conn->async_exec(req, adapt(resp));
|
||||
```
|
||||
|
||||
For a complete example see containers.cpp.
|
||||
For a complete example see cpp20_containers.cpp.
|
||||
|
||||
### Deserialization
|
||||
|
||||
@@ -541,26 +603,28 @@ from Redis with `HGETALL`, some of the options are
|
||||
* `std::map<U, V>`: Efficient if you are storing serialized data. Avoids temporaries and requires `from_bulk` for `U` and `V`.
|
||||
|
||||
In addition to the above users can also use unordered versions of the
|
||||
containers. The same reasoning also applies to sets e.g. `SMEMBERS`
|
||||
containers. The same reasoning applies to sets e.g. `SMEMBERS`
|
||||
and other data structures in general.
|
||||
|
||||
## Examples
|
||||
|
||||
These examples demonstrate what has been discussed so far.
|
||||
The examples below show how to use the features discussed so far
|
||||
|
||||
* intro.cpp: The Aedis hello-world program. Sends one command and quits the connection.
|
||||
* intro_tls.cpp: Same as intro.cpp but over TLS.
|
||||
* intro_sync.cpp: Shows how to use the connection class synchronously.
|
||||
* containers.cpp: Shows how to send and receive STL containers and how to use transactions.
|
||||
* serialization.cpp: Shows how to serialize types using Boost.Json.
|
||||
* resolve_with_sentinel.cpp: Shows how to resolve a master address using sentinels.
|
||||
* subscriber.cpp: Shows how to implement pubsub with reconnection re-subscription.
|
||||
* echo_server.cpp: A simple TCP echo server.
|
||||
* chat_room.cpp: A command line chat built on Redis pubsub.
|
||||
* low_level_sync.cpp: Sends a ping synchronously using the low-level API.
|
||||
* low_level_async.cpp: Sends a ping asynchronously using the low-level API.
|
||||
* cpp20_intro_awaitable_ops.cpp: The version shown above.
|
||||
* cpp20_intro.cpp: Does not use awaitable operators.
|
||||
* cpp20_intro_tls.cpp: Communicates over TLS.
|
||||
* cpp20_containers.cpp: Shows how to send and receive STL containers and how to use transactions.
|
||||
* cpp20_serialization.cpp: Shows how to serialize types using Boost.Json.
|
||||
* cpp20_resolve_with_sentinel.cpp: Shows how to resolve a master address using sentinels.
|
||||
* cpp20_subscriber.cpp: Shows how to implement pubsub with reconnection re-subscription.
|
||||
* cpp20_echo_server.cpp: A simple TCP echo server.
|
||||
* cpp20_chat_room.cpp: A command line chat built on Redis pubsub.
|
||||
* cpp20_low_level_async.cpp: Sends a ping asynchronously using the low-level API.
|
||||
* cpp17_low_level_sync.cpp: Sends a ping synchronously using the low-level API.
|
||||
* cpp17_intro.cpp: Uses callbacks and requires C++17.
|
||||
* cpp17_intro_sync.cpp: Runs `async_run` in a separate thread and performs synchronous calls to `async_exec`.
|
||||
|
||||
To avoid repetition code that is common to all examples has been
|
||||
To avoid repetition code that is common to some examples has been
|
||||
grouped in common.hpp. The main function used in some async examples
|
||||
has been factored out in the main.cpp file.
|
||||
|
||||
@@ -667,10 +731,10 @@ stars, namely
|
||||
|
||||
### Aedis vs Redis-plus-plus
|
||||
|
||||
Before we start it is important to mentioning some of the things
|
||||
Before we start it is important to mention some of the things
|
||||
redis-plus-plus does not support
|
||||
|
||||
* The latest version of the communication protocol RESP3. Without it it is impossible to support some important Redis features like client side caching, among other things.
|
||||
* The latest version of the communication protocol RESP3. Without that it is impossible to support some important Redis features like client side caching, among other things.
|
||||
* Coroutines.
|
||||
* Reading responses directly in user data structures to avoid creating temporaries.
|
||||
* Error handling with support for error-code.
|
||||
@@ -767,7 +831,7 @@ It is also not clear how are pipelines realised with this design
|
||||
## Reference
|
||||
|
||||
* [High-Level](#high-level-api): Covers the topics discussed in this document.
|
||||
* [Low-Level](#low-level-api): Covers low-level building blocks. Provided mostly for developers, most users won't need any information provided here.
|
||||
* [Low-Level](#low-level-api): Covers low-level building blocks. Provided mostly for developers, users won't usually need any information provided here.
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -781,25 +845,11 @@ library, so you can starting using it right away by adding the
|
||||
```
|
||||
|
||||
in no more than one source file in your applications. To build the
|
||||
examples and test cmake is supported, for example
|
||||
examples and tests cmake is supported, for example
|
||||
|
||||
```cpp
|
||||
BOOST_ROOT=/opt/boost_1_80_0 cmake --preset dev
|
||||
```
|
||||
|
||||
The requirements for using Aedis are
|
||||
|
||||
- Boost 1.80 or greater.
|
||||
- C++17 minimum.
|
||||
- Redis 6 or higher (must support RESP3).
|
||||
- Optionally also redis-cli and Redis Sentinel.
|
||||
|
||||
The following compilers are supported
|
||||
|
||||
- Gcc: 10, 11, 12.
|
||||
- Clang: 11, 13, 14.
|
||||
- Visual Studio 17 2022, Visual Studio 16 2019.
|
||||
|
||||
## Acknowledgement
|
||||
|
||||
Acknowledgement to people that helped shape Aedis
|
||||
@@ -809,11 +859,13 @@ Acknowledgement to people that helped shape Aedis
|
||||
* Petr Dannhofer ([Eddie-cz](https://github.com/Eddie-cz)): For helping me understand how the `AUTH` and `HELLO` command can influence each other.
|
||||
* Mohammad Nejati ([ashtum](https://github.com/ashtum)): For pointing out scenarios where calls to `async_exec` should fail when the connection is lost.
|
||||
* Klemens Morgenstern ([klemens-morgenstern](https://github.com/klemens-morgenstern)): For useful discussion about timeouts, cancellation, synchronous interfaces and general help with Asio.
|
||||
* Vinnie Falco ([vinniefalco](https://github.com/vinniefalco)): For general suggestions about how to improve the code and the documentation.
|
||||
|
||||
## Changelog
|
||||
|
||||
### v1.4.0
|
||||
### v1.4.0-1
|
||||
|
||||
* Renames `retry_on_connection_lost` to `cancel_if_unresponded`. (v1.4.1)
|
||||
* Removes dependency on Boost.Hana, boost::string_view, Boost.Variant2 and Boost.Spirit.
|
||||
* Fixes build and setup CI on windows.
|
||||
|
||||
@@ -989,7 +1041,7 @@ Acknowledgement to people that helped shape Aedis
|
||||
|
||||
* `connection::async_receive_event` is now being used to communicate
|
||||
internal events to the user, such as resolve, connect, push etc. For
|
||||
examples see subscriber.cpp and `connection::event`.
|
||||
examples see cpp20_subscriber.cpp and `connection::event`.
|
||||
|
||||
* The `aedis` directory has been moved to `include` to look more
|
||||
similar to Boost libraries. Users should now replace `-I/aedis-path`
|
||||
|
||||
@@ -1285,7 +1285,7 @@ HTML_STYLESHEET =
|
||||
# list). For an example see the documentation.
|
||||
# This tag requires that the tag GENERATE_HTML is set to YES.
|
||||
|
||||
HTML_EXTRA_STYLESHEET = doc/aedis.css
|
||||
HTML_EXTRA_STYLESHEET = doc/doxygen-awesome.css doc/doxygen-awesome-sidebar-only.css
|
||||
|
||||
# The HTML_EXTRA_FILES tag can be used to specify one or more extra images or
|
||||
# other source files which should be copied to the HTML output directory. Note
|
||||
@@ -1578,7 +1578,7 @@ ECLIPSE_DOC_ID = org.doxygen.Project
|
||||
# The default value is: NO.
|
||||
# This tag requires that the tag GENERATE_HTML is set to YES.
|
||||
|
||||
DISABLE_INDEX = YES
|
||||
DISABLE_INDEX = NO
|
||||
|
||||
# The GENERATE_TREEVIEW tag is used to specify whether a tree-like index
|
||||
# structure should be generated to display hierarchical information. If the tag
|
||||
|
||||
1609
doc/aedis.css
1609
doc/aedis.css
File diff suppressed because it is too large
Load Diff
115
doc/doxygen-awesome-sidebar-only.css
Normal file
115
doc/doxygen-awesome-sidebar-only.css
Normal file
@@ -0,0 +1,115 @@
|
||||
/**
|
||||
|
||||
Doxygen Awesome
|
||||
https://github.com/jothepro/doxygen-awesome-css
|
||||
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2021 jothepro
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
*/
|
||||
|
||||
html {
|
||||
/* side nav width. MUST be = `TREEVIEW_WIDTH`.
|
||||
* Make sure it is wide enough to contain the page title (logo + title + version)
|
||||
*/
|
||||
--side-nav-fixed-width: 335px;
|
||||
--menu-display: none;
|
||||
|
||||
--top-height: 120px;
|
||||
--toc-sticky-top: -25px;
|
||||
--toc-max-height: calc(100vh - 2 * var(--spacing-medium) - 25px);
|
||||
}
|
||||
|
||||
#projectname {
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
|
||||
@media screen and (min-width: 768px) {
|
||||
html {
|
||||
--searchbar-background: var(--page-background-color);
|
||||
}
|
||||
|
||||
#side-nav {
|
||||
min-width: var(--side-nav-fixed-width);
|
||||
max-width: var(--side-nav-fixed-width);
|
||||
top: var(--top-height);
|
||||
overflow: visible;
|
||||
}
|
||||
|
||||
#nav-tree, #side-nav {
|
||||
height: calc(100vh - var(--top-height)) !important;
|
||||
}
|
||||
|
||||
#nav-tree {
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
#top {
|
||||
display: block;
|
||||
border-bottom: none;
|
||||
height: var(--top-height);
|
||||
margin-bottom: calc(0px - var(--top-height));
|
||||
max-width: var(--side-nav-fixed-width);
|
||||
overflow: hidden;
|
||||
background: var(--side-nav-background);
|
||||
}
|
||||
#main-nav {
|
||||
float: left;
|
||||
padding-right: 0;
|
||||
}
|
||||
|
||||
.ui-resizable-handle {
|
||||
cursor: default;
|
||||
width: 1px !important;
|
||||
box-shadow: 0 calc(-2 * var(--top-height)) 0 0 var(--separator-color);
|
||||
}
|
||||
|
||||
#nav-path {
|
||||
position: fixed;
|
||||
right: 0;
|
||||
left: var(--side-nav-fixed-width);
|
||||
bottom: 0;
|
||||
width: auto;
|
||||
}
|
||||
|
||||
#doc-content {
|
||||
height: calc(100vh - 31px) !important;
|
||||
padding-bottom: calc(3 * var(--spacing-large));
|
||||
padding-top: calc(var(--top-height) - 80px);
|
||||
box-sizing: border-box;
|
||||
margin-left: var(--side-nav-fixed-width) !important;
|
||||
}
|
||||
|
||||
#MSearchBox {
|
||||
width: calc(var(--side-nav-fixed-width) - calc(2 * var(--spacing-medium)));
|
||||
}
|
||||
|
||||
#MSearchField {
|
||||
width: calc(var(--side-nav-fixed-width) - calc(2 * var(--spacing-medium)) - 65px);
|
||||
}
|
||||
|
||||
#MSearchResultsWindow {
|
||||
left: var(--spacing-medium) !important;
|
||||
right: auto;
|
||||
}
|
||||
}
|
||||
2405
doc/doxygen-awesome.css
Normal file
2405
doc/doxygen-awesome.css
Normal file
File diff suppressed because it is too large
Load Diff
@@ -4,5 +4,4 @@
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <aedis.hpp>
|
||||
#include <aedis/src.hpp>
|
||||
|
||||
@@ -10,11 +10,19 @@
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
extern boost::asio::awaitable<void> async_main();
|
||||
extern boost::asio::awaitable<void> co_main(std::string, std::string);
|
||||
|
||||
auto main() -> int
|
||||
auto main(int argc, char * argv[]) -> int
|
||||
{
|
||||
return run(async_main());
|
||||
std::string host = "127.0.0.1";
|
||||
std::string port = "6379";
|
||||
|
||||
if (argc == 3) {
|
||||
host = argv[1];
|
||||
port = argv[2];
|
||||
}
|
||||
|
||||
return run(co_main(host, port));
|
||||
}
|
||||
|
||||
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)
|
||||
|
||||
101
examples/cpp17_intro.cpp
Normal file
101
examples/cpp17_intro.cpp
Normal file
@@ -0,0 +1,101 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <boost/asio.hpp>
|
||||
#include <aedis.hpp>
|
||||
#include <aedis/src.hpp>
|
||||
|
||||
namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
using aedis::resp3::request;
|
||||
using aedis::adapt;
|
||||
using aedis::operation;
|
||||
|
||||
void log(boost::system::error_code const& ec, char const* prefix)
|
||||
{
|
||||
std::clog << prefix << ec.message() << std::endl;
|
||||
}
|
||||
|
||||
auto main(int argc, char * argv[]) -> int
|
||||
{
|
||||
try {
|
||||
std::string host = "127.0.0.1";
|
||||
std::string port = "6379";
|
||||
|
||||
if (argc == 3) {
|
||||
host = argv[1];
|
||||
port = argv[2];
|
||||
}
|
||||
|
||||
// The request
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("PING", "Hello world");
|
||||
req.push("QUIT");
|
||||
|
||||
// The response.
|
||||
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
|
||||
|
||||
net::io_context ioc;
|
||||
|
||||
// IO objects.
|
||||
net::ip::tcp::resolver resv{ioc};
|
||||
aedis::connection conn{ioc};
|
||||
|
||||
// Resolve endpoints.
|
||||
net::ip::tcp::resolver::results_type endpoints;
|
||||
|
||||
// async_run callback.
|
||||
auto on_run = [](auto ec)
|
||||
{
|
||||
if (ec)
|
||||
return log(ec, "on_run: ");
|
||||
};
|
||||
|
||||
// async_exec callback.
|
||||
auto on_exec = [&](auto ec, auto)
|
||||
{
|
||||
if (ec) {
|
||||
conn.cancel(operation::run);
|
||||
return log(ec, "on_exec: ");
|
||||
}
|
||||
|
||||
std::cout << "PING: " << std::get<1>(resp) << std::endl;
|
||||
};
|
||||
|
||||
// Connect callback.
|
||||
auto on_connect = [&](auto ec, auto)
|
||||
{
|
||||
if (ec)
|
||||
return log(ec, "on_connect: ");
|
||||
|
||||
conn.async_run(on_run);
|
||||
conn.async_exec(req, adapt(resp), on_exec);
|
||||
};
|
||||
|
||||
// Resolve callback.
|
||||
auto on_resolve = [&](auto ec, auto const& addrs)
|
||||
{
|
||||
if (ec)
|
||||
return log(ec, "on_resolve: ");
|
||||
|
||||
endpoints = addrs;
|
||||
net::async_connect(conn.next_layer(), endpoints, on_connect);
|
||||
};
|
||||
|
||||
resv.async_resolve(host, port, on_resolve);
|
||||
|
||||
ioc.run();
|
||||
return 0;
|
||||
|
||||
} catch (std::exception const& e) {
|
||||
std::cerr << "Error: " << e.what() << std::endl;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
@@ -31,15 +31,30 @@ auto exec(std::shared_ptr<connection> conn, resp3::request const& req, Adapter a
|
||||
auto logger = [](auto const& ec)
|
||||
{ std::clog << "Run: " << ec.message() << std::endl; };
|
||||
|
||||
int main()
|
||||
auto main(int argc, char * argv[]) -> int
|
||||
{
|
||||
try {
|
||||
std::string host = "127.0.0.1";
|
||||
std::string port = "6379";
|
||||
|
||||
if (argc == 3) {
|
||||
host = argv[1];
|
||||
port = argv[2];
|
||||
}
|
||||
|
||||
net::io_context ioc{1};
|
||||
|
||||
auto conn = std::make_shared<connection>(ioc);
|
||||
|
||||
// Resolves the address
|
||||
net::ip::tcp::resolver resv{ioc};
|
||||
auto const res = resv.resolve("127.0.0.1", "6379");
|
||||
auto const res = resv.resolve(host, port);
|
||||
|
||||
// Connect to Redis
|
||||
net::connect(conn->next_layer(), res);
|
||||
|
||||
// Starts a thread that will can io_context::run on which
|
||||
// the connection will run.
|
||||
std::thread t{[conn, &ioc]() {
|
||||
conn->async_run(logger);
|
||||
ioc.run();
|
||||
@@ -51,6 +66,8 @@ int main()
|
||||
req.push("QUIT");
|
||||
|
||||
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
|
||||
|
||||
// Executes commands synchronously.
|
||||
exec(conn, req, adapt(resp));
|
||||
|
||||
std::cout << "Response: " << std::get<1>(resp) << std::endl;
|
||||
@@ -16,12 +16,20 @@ namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
using aedis::adapter::adapt2;
|
||||
|
||||
int main()
|
||||
auto main(int argc, char * argv[]) -> int
|
||||
{
|
||||
try {
|
||||
std::string host = "127.0.0.1";
|
||||
std::string port = "6379";
|
||||
|
||||
if (argc == 3) {
|
||||
host = argv[1];
|
||||
port = argv[2];
|
||||
}
|
||||
|
||||
net::io_context ioc;
|
||||
net::ip::tcp::resolver resv{ioc};
|
||||
auto const res = resv.resolve("127.0.0.1", "6379");
|
||||
auto const res = resv.resolve(host, port);
|
||||
net::ip::tcp::socket socket{ioc};
|
||||
net::connect(socket, res);
|
||||
|
||||
@@ -47,7 +47,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
|
||||
}
|
||||
|
||||
// Called from the main function (see main.cpp)
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
@@ -58,13 +58,13 @@ auto async_main() -> net::awaitable<void>
|
||||
req.push("HELLO", 3);
|
||||
req.push("SUBSCRIBE", "chat-channel");
|
||||
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await connect(conn, host, port);
|
||||
co_await ((conn->async_run() || publisher(stream, conn) || receiver(conn) ||
|
||||
healthy_checker(conn) || sig.async_wait()) && conn->async_exec(req));
|
||||
}
|
||||
|
||||
#else // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
std::cout << "Requires support for posix streams." << std::endl;
|
||||
co_return;
|
||||
@@ -30,14 +30,15 @@ void print(std::vector<int> const& cont)
|
||||
std::cout << "\n";
|
||||
}
|
||||
|
||||
// Stores the content of some STL containers in Redis.
|
||||
auto store() -> net::awaitable<void>
|
||||
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
|
||||
// Resolves and connects (from examples/common.hpp to avoid vebosity)
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await connect(conn, host, port);
|
||||
co_await conn->async_run();
|
||||
}
|
||||
|
||||
// Stores the content of some STL containers in Redis.
|
||||
auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
std::vector<int> vec
|
||||
{1, 2, 3, 4, 5, 6};
|
||||
|
||||
@@ -48,70 +49,68 @@ auto store() -> net::awaitable<void>
|
||||
req.push("HELLO", 3);
|
||||
req.push_range("RPUSH", "rpush-key", vec);
|
||||
req.push_range("HSET", "hset-key", map);
|
||||
req.push("QUIT");
|
||||
|
||||
co_await (conn->async_run() || conn->async_exec(req));
|
||||
co_await conn->async_exec(req);
|
||||
}
|
||||
|
||||
auto hgetall() -> net::awaitable<std::map<std::string, std::string>>
|
||||
auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
|
||||
// From examples/common.hpp to avoid vebosity
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
|
||||
// A request contains multiple commands.
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("HGETALL", "hset-key");
|
||||
req.push("QUIT");
|
||||
|
||||
// Responses as tuple elements.
|
||||
std::tuple<aedis::ignore, std::map<std::string, std::string>, aedis::ignore> resp;
|
||||
std::tuple<aedis::ignore, std::map<std::string, std::string>> resp;
|
||||
|
||||
// Executes the request and reads the response.
|
||||
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
|
||||
co_return std::get<1>(resp);
|
||||
co_await conn->async_exec(req, adapt(resp));
|
||||
|
||||
print(std::get<1>(resp));
|
||||
}
|
||||
|
||||
// Retrieves in a transaction.
|
||||
auto transaction() -> net::awaitable<void>
|
||||
auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
|
||||
// Resolves and connects (from examples/common.hpp to avoid vebosity)
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
req.push("MULTI");
|
||||
req.push("LRANGE", "rpush-key", 0, -1); // Retrieves
|
||||
req.push("HGETALL", "hset-key"); // Retrieves
|
||||
req.push("EXEC");
|
||||
req.push("QUIT");
|
||||
|
||||
std::tuple<
|
||||
aedis::ignore, // hello
|
||||
aedis::ignore, // multi
|
||||
aedis::ignore, // lrange
|
||||
aedis::ignore, // hgetall
|
||||
std::tuple<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>>, // exec
|
||||
aedis::ignore // quit
|
||||
std::tuple<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>> // exec
|
||||
> resp;
|
||||
|
||||
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
|
||||
co_await conn->async_exec(req, adapt(resp));
|
||||
|
||||
print(std::get<0>(std::get<4>(resp)).value());
|
||||
print(std::get<1>(std::get<4>(resp)).value());
|
||||
}
|
||||
|
||||
// Called from the main function (see main.cpp)
|
||||
net::awaitable<void> async_main()
|
||||
auto quit(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
co_await store();
|
||||
co_await transaction();
|
||||
auto const map = co_await hgetall();
|
||||
print(map);
|
||||
resp3::request req;
|
||||
req.push("QUIT");
|
||||
|
||||
co_await conn->async_exec(req);
|
||||
}
|
||||
|
||||
// Called from the main function (see main.cpp)
|
||||
net::awaitable<void> co_main(std::string host, std::string port)
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
net::co_spawn(ex, run(conn, host, port), net::detached);
|
||||
co_await store(conn);
|
||||
co_await transaction(conn);
|
||||
co_await hgetall(conn);
|
||||
co_await quit(conn);
|
||||
}
|
||||
|
||||
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
|
||||
@@ -45,7 +45,7 @@ auto listener(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
}
|
||||
|
||||
// Called from the main function (see main.cpp)
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
@@ -54,7 +54,7 @@ auto async_main() -> net::awaitable<void>
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await connect(conn, host, port);
|
||||
co_await ((conn->async_run() || listener(conn) || healthy_checker(conn) ||
|
||||
sig.async_wait()) && conn->async_exec(req));
|
||||
}
|
||||
61
examples/cpp20_intro.cpp
Normal file
61
examples/cpp20_intro.cpp
Normal file
@@ -0,0 +1,61 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
|
||||
#include <aedis.hpp>
|
||||
#include "common/common.hpp"
|
||||
|
||||
namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
using aedis::adapt;
|
||||
using aedis::operation;
|
||||
|
||||
auto run(std::shared_ptr<connection> conn, std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
co_await connect(conn, host, port);
|
||||
co_await conn->async_run();
|
||||
}
|
||||
|
||||
auto hello(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
|
||||
co_await conn->async_exec(req);
|
||||
}
|
||||
|
||||
auto ping(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
resp3::request req;
|
||||
req.push("PING", "Hello world");
|
||||
|
||||
std::tuple<std::string> resp;
|
||||
co_await conn->async_exec(req, adapt(resp));
|
||||
|
||||
std::cout << "PING: " << std::get<0>(resp) << std::endl;
|
||||
}
|
||||
|
||||
auto quit(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
resp3::request req;
|
||||
req.push("QUIT");
|
||||
|
||||
co_await conn->async_exec(req);
|
||||
}
|
||||
|
||||
// Called from the main function (see main.cpp)
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
net::co_spawn(ex, run(conn, host, port), net::detached);
|
||||
co_await hello(conn);
|
||||
co_await ping(conn);
|
||||
co_await quit(conn);
|
||||
}
|
||||
|
||||
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
|
||||
@@ -16,7 +16,7 @@ using namespace net::experimental::awaitable_operators;
|
||||
using aedis::adapt;
|
||||
|
||||
// Called from the main function (see main.cpp)
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3);
|
||||
@@ -26,7 +26,7 @@ auto async_main() -> net::awaitable<void>
|
||||
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
|
||||
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await connect(conn, host, port);
|
||||
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
|
||||
|
||||
std::cout << "PING: " << std::get<1>(resp) << std::endl;
|
||||
@@ -29,7 +29,7 @@ auto verify_certificate(bool, net::ssl::verify_context&) -> bool
|
||||
return true;
|
||||
}
|
||||
|
||||
net::awaitable<void> async_main()
|
||||
net::awaitable<void> co_main(std::string, std::string)
|
||||
{
|
||||
resp3::request req;
|
||||
req.push("HELLO", 3, "AUTH", "aedis", "aedis");
|
||||
@@ -17,12 +17,12 @@ using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>
|
||||
using aedis::adapter::adapt2;
|
||||
using net::ip::tcp;
|
||||
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
|
||||
resolver resv{ex};
|
||||
auto const addrs = co_await resv.async_resolve("127.0.0.1", "6379");
|
||||
auto const addrs = co_await resv.async_resolve(host, port);
|
||||
tcp_socket socket{ex};
|
||||
co_await net::async_connect(socket, addrs);
|
||||
|
||||
@@ -49,14 +49,14 @@ auto resolve_master_address(std::vector<address> const& endpoints) -> net::await
|
||||
co_return address{};
|
||||
}
|
||||
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
// A list of sentinel addresses from which only one is responsive
|
||||
// to simulate sentinels that are down.
|
||||
// A list of sentinel addresses from which only one is responsive.
|
||||
// This simulates sentinels that are down.
|
||||
std::vector<address> const endpoints
|
||||
{ {"foo", "26379"}
|
||||
, {"bar", "26379"}
|
||||
, {"127.0.0.1", "26379"}
|
||||
, {host, port}
|
||||
};
|
||||
|
||||
auto const ep = co_await resolve_master_address(endpoints);
|
||||
@@ -86,7 +86,7 @@ void from_bulk(user& u, std::string_view sv, boost::system::error_code&)
|
||||
u = value_to<user>(jv);
|
||||
}
|
||||
|
||||
net::awaitable<void> async_main()
|
||||
net::awaitable<void> co_main(std::string host, std::string port)
|
||||
{
|
||||
std::set<user> users
|
||||
{{"Joao", "58", "Brazil"} , {"Serge", "60", "France"}};
|
||||
@@ -101,7 +101,7 @@ net::awaitable<void> async_main()
|
||||
|
||||
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
|
||||
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await connect(conn, host, port);
|
||||
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
|
||||
|
||||
for (auto const& e: std::get<2>(resp))
|
||||
@@ -14,7 +14,6 @@
|
||||
namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
using namespace net::experimental::awaitable_operators;
|
||||
using signal_set = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
|
||||
using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
|
||||
using aedis::adapt;
|
||||
|
||||
@@ -37,18 +36,18 @@ using aedis::adapt;
|
||||
// Receives pushes.
|
||||
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
|
||||
{
|
||||
for (std::vector<resp3::node<std::string>> resp;;) {
|
||||
using resp_type = std::vector<resp3::node<std::string>>;
|
||||
for (resp_type resp;;) {
|
||||
co_await conn->async_receive(adapt(resp));
|
||||
std::cout << resp.at(1).value << " " << resp.at(2).value << " " << resp.at(3).value << std::endl;
|
||||
resp.clear();
|
||||
}
|
||||
}
|
||||
|
||||
auto async_main() -> net::awaitable<void>
|
||||
auto co_main(std::string host, std::string port) -> net::awaitable<void>
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
auto conn = std::make_shared<connection>(ex);
|
||||
signal_set sig{ex, SIGINT, SIGTERM};
|
||||
steady_timer timer{ex};
|
||||
|
||||
resp3::request req;
|
||||
@@ -57,9 +56,8 @@ auto async_main() -> net::awaitable<void>
|
||||
|
||||
// The loop will reconnect on connection lost. To exit type Ctrl-C twice.
|
||||
for (;;) {
|
||||
co_await connect(conn, "127.0.0.1", "6379");
|
||||
co_await ((conn->async_run() || healthy_checker(conn) || sig.async_wait() ||
|
||||
receiver(conn)) && conn->async_exec(req));
|
||||
co_await connect(conn, host, port);
|
||||
co_await ((conn->async_run() || healthy_checker(conn) || receiver(conn)) && conn->async_exec(req));
|
||||
|
||||
conn->reset_stream();
|
||||
timer.expires_after(std::chrono::seconds{1});
|
||||
@@ -7,17 +7,17 @@
|
||||
#ifndef AEDIS_ADAPT_HPP
|
||||
#define AEDIS_ADAPT_HPP
|
||||
|
||||
#include <tuple>
|
||||
#include <limits>
|
||||
#include <string_view>
|
||||
#include <variant>
|
||||
#include <aedis/resp3/node.hpp>
|
||||
#include <aedis/adapter/adapt.hpp>
|
||||
#include <aedis/adapter/detail/response_traits.hpp>
|
||||
|
||||
#include <boost/mp11.hpp>
|
||||
#include <boost/system.hpp>
|
||||
|
||||
#include <aedis/resp3/node.hpp>
|
||||
#include <aedis/adapter/adapt.hpp>
|
||||
#include <aedis/adapter/detail/response_traits.hpp>
|
||||
#include <tuple>
|
||||
#include <limits>
|
||||
#include <string_view>
|
||||
#include <variant>
|
||||
|
||||
namespace aedis {
|
||||
|
||||
|
||||
@@ -7,6 +7,14 @@
|
||||
#ifndef AEDIS_ADAPTER_ADAPTERS_HPP
|
||||
#define AEDIS_ADAPTER_ADAPTERS_HPP
|
||||
|
||||
#include <aedis/error.hpp>
|
||||
#include <aedis/resp3/type.hpp>
|
||||
#include <aedis/resp3/request.hpp>
|
||||
#include <aedis/resp3/detail/parser.hpp>
|
||||
#include <aedis/resp3/node.hpp>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
|
||||
#include <set>
|
||||
#include <optional>
|
||||
#include <unordered_set>
|
||||
@@ -21,14 +29,6 @@
|
||||
#include <string_view>
|
||||
#include <charconv>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
|
||||
#include <aedis/error.hpp>
|
||||
#include <aedis/resp3/type.hpp>
|
||||
#include <aedis/resp3/request.hpp>
|
||||
#include <aedis/resp3/detail/parser.hpp>
|
||||
#include <aedis/resp3/node.hpp>
|
||||
|
||||
namespace aedis::adapter::detail {
|
||||
|
||||
// Serialization.
|
||||
|
||||
@@ -7,18 +7,18 @@
|
||||
#ifndef AEDIS_ADAPTER_RESPONSE_TRAITS_HPP
|
||||
#define AEDIS_ADAPTER_RESPONSE_TRAITS_HPP
|
||||
|
||||
#include <vector>
|
||||
#include <tuple>
|
||||
#include <string_view>
|
||||
#include <variant>
|
||||
|
||||
#include <boost/mp11.hpp>
|
||||
|
||||
#include <aedis/error.hpp>
|
||||
#include <aedis/resp3/type.hpp>
|
||||
#include <aedis/resp3/read.hpp>
|
||||
#include <aedis/adapter/detail/adapters.hpp>
|
||||
|
||||
#include <boost/mp11.hpp>
|
||||
|
||||
#include <vector>
|
||||
#include <tuple>
|
||||
#include <string_view>
|
||||
#include <variant>
|
||||
|
||||
namespace aedis::adapter::detail {
|
||||
|
||||
using ignore = std::decay_t<decltype(std::ignore)>;
|
||||
|
||||
@@ -7,11 +7,12 @@
|
||||
#ifndef AEDIS_CONNECTION_HPP
|
||||
#define AEDIS_CONNECTION_HPP
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <aedis/detail/connection_base.hpp>
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <aedis/detail/connection_base.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
|
||||
namespace aedis {
|
||||
|
||||
@@ -120,7 +121,7 @@ public:
|
||||
* @param adapter Response adapter.
|
||||
* @param token Asio completion token.
|
||||
*
|
||||
* For an example see echo_server.cpp. The completion token must
|
||||
* For an example see cpp20_echo_server.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
@@ -150,7 +151,7 @@ public:
|
||||
* @param adapter The response adapter.
|
||||
* @param token The Asio completion token.
|
||||
*
|
||||
* For an example see subscriber.cpp. The completion token must
|
||||
* For an example see cpp20_subscriber.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
|
||||
@@ -7,13 +7,10 @@
|
||||
#ifndef AEDIS_CONNECTION_BASE_HPP
|
||||
#define AEDIS_CONNECTION_BASE_HPP
|
||||
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
#include <limits>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <memory_resource>
|
||||
#include <aedis/adapt.hpp>
|
||||
#include <aedis/operation.hpp>
|
||||
#include <aedis/resp3/request.hpp>
|
||||
#include <aedis/detail/connection_ops.hpp>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
@@ -22,10 +19,13 @@
|
||||
#include <boost/asio/deferred.hpp>
|
||||
#include <boost/asio/experimental/channel.hpp>
|
||||
|
||||
#include <aedis/adapt.hpp>
|
||||
#include <aedis/operation.hpp>
|
||||
#include <aedis/resp3/request.hpp>
|
||||
#include <aedis/detail/connection_ops.hpp>
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
#include <limits>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <memory_resource>
|
||||
|
||||
namespace aedis::detail {
|
||||
|
||||
@@ -44,7 +44,6 @@ public:
|
||||
using executor_type = Executor;
|
||||
using this_type = connection_base<Executor, Derived>;
|
||||
|
||||
explicit
|
||||
connection_base(executor_type ex, std::pmr::memory_resource* resource)
|
||||
: writer_timer_{ex}
|
||||
, read_timer_{ex}
|
||||
@@ -104,7 +103,6 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Remove requests that have the flag cancel_if_not_sent_when_connection_lost set
|
||||
auto cancel_on_conn_lost() -> std::size_t
|
||||
{
|
||||
// Must return false if the request should be removed.
|
||||
@@ -113,7 +111,7 @@ public:
|
||||
BOOST_ASSERT(ptr != nullptr);
|
||||
|
||||
if (ptr->is_written()) {
|
||||
return ptr->get_request().get_config().retry_on_connection_lost;
|
||||
return !ptr->get_request().get_config().cancel_if_unresponded;
|
||||
} else {
|
||||
return !ptr->get_request().get_config().cancel_on_connection_lost;
|
||||
}
|
||||
@@ -135,13 +133,8 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <
|
||||
class Adapter = detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_exec(
|
||||
resp3::request const& req,
|
||||
Adapter adapter = adapt(),
|
||||
CompletionToken token = CompletionToken{})
|
||||
template <class Adapter, class CompletionToken>
|
||||
auto async_exec(resp3::request const& req, Adapter adapter, CompletionToken token)
|
||||
{
|
||||
BOOST_ASSERT_MSG(req.size() <= adapter.get_supported_response_size(), "Request and adapter have incompatible sizes.");
|
||||
|
||||
@@ -151,12 +144,8 @@ public:
|
||||
>(detail::exec_op<Derived, Adapter>{&derived(), &req, adapter}, token, writer_timer_);
|
||||
}
|
||||
|
||||
template <
|
||||
class Adapter = detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_receive(
|
||||
Adapter adapter = adapt(),
|
||||
CompletionToken token = CompletionToken{})
|
||||
template <class Adapter, class CompletionToken>
|
||||
auto async_receive(Adapter adapter, CompletionToken token)
|
||||
{
|
||||
auto f = detail::make_adapter_wrapper(adapter);
|
||||
|
||||
@@ -248,8 +237,8 @@ private:
|
||||
[[nodiscard]] auto get_request() const noexcept -> auto const&
|
||||
{ return *req_; }
|
||||
|
||||
[[nodiscard]] auto get_action() const noexcept
|
||||
{ return action_;}
|
||||
[[nodiscard]] auto stop_requested() const noexcept
|
||||
{ return action_ == action::stop;}
|
||||
|
||||
template <class CompletionToken>
|
||||
auto async_wait(CompletionToken token)
|
||||
|
||||
@@ -7,16 +7,6 @@
|
||||
#ifndef AEDIS_CONNECTION_OPS_HPP
|
||||
#define AEDIS_CONNECTION_OPS_HPP
|
||||
|
||||
#include <array>
|
||||
#include <algorithm>
|
||||
#include <string_view>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/system.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
#include <boost/asio/experimental/parallel_group.hpp>
|
||||
|
||||
#include <aedis/adapt.hpp>
|
||||
#include <aedis/error.hpp>
|
||||
#include <aedis/detail/guarded_operation.hpp>
|
||||
@@ -26,7 +16,15 @@
|
||||
#include <aedis/resp3/write.hpp>
|
||||
#include <aedis/resp3/request.hpp>
|
||||
|
||||
#include <boost/asio/yield.hpp>
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/system.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
#include <boost/asio/experimental/parallel_group.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <algorithm>
|
||||
#include <string_view>
|
||||
|
||||
namespace aedis::detail {
|
||||
|
||||
@@ -45,7 +43,7 @@ struct exec_read_op {
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
reenter (coro)
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
// Loop reading the responses to this request.
|
||||
BOOST_ASSERT(!conn->reqs_.empty());
|
||||
@@ -57,7 +55,7 @@ struct exec_read_op {
|
||||
// to hand it to the push consumer. To do that we need
|
||||
// some data in the read bufer.
|
||||
if (conn->read_buffer_.empty()) {
|
||||
yield
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_read_until(
|
||||
conn->next_layer(),
|
||||
conn->make_dynamic_buffer(),
|
||||
@@ -68,13 +66,14 @@ struct exec_read_op {
|
||||
// If the next request is a push we have to handle it to
|
||||
// the receive_op wait for it to be done and continue.
|
||||
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) {
|
||||
yield conn->guarded_op_.async_run(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->guarded_op_.async_run(std::move(self));
|
||||
AEDIS_CHECK_OP1(conn->cancel(operation::run););
|
||||
continue;
|
||||
}
|
||||
//-----------------------------------
|
||||
|
||||
yield
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
resp3::async_read(
|
||||
conn->next_layer(),
|
||||
conn->make_dynamic_buffer(adapter.get_max_read_size(index)),
|
||||
@@ -116,7 +115,7 @@ struct exec_op {
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
reenter (coro)
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
// Check whether the user wants to wait for the connection to
|
||||
// be stablished.
|
||||
@@ -128,10 +127,11 @@ struct exec_op {
|
||||
|
||||
conn->add_request_info(info);
|
||||
EXEC_OP_WAIT:
|
||||
yield info->async_wait(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
info->async_wait(std::move(self));
|
||||
BOOST_ASSERT(ec == boost::asio::error::operation_aborted);
|
||||
|
||||
if (info->get_action() == Conn::req_info::action::stop) {
|
||||
if (info->stop_requested()) {
|
||||
// Don't have to call remove_request as it has already
|
||||
// been by cancel(exec).
|
||||
return self.complete(ec, 0);
|
||||
@@ -139,9 +139,20 @@ EXEC_OP_WAIT:
|
||||
|
||||
if (is_cancelled(self)) {
|
||||
if (info->is_written()) {
|
||||
self.get_cancellation_state().clear();
|
||||
goto EXEC_OP_WAIT; // Too late, can't cancel.
|
||||
using c_t = boost::asio::cancellation_type;
|
||||
auto const c = self.get_cancellation_state().cancelled();
|
||||
if ((c & c_t::terminal) != c_t::none) {
|
||||
// Cancellation requires closing the connection
|
||||
// otherwise it stays in inconsistent state.
|
||||
conn->cancel(operation::run);
|
||||
return self.complete(ec, 0);
|
||||
} else {
|
||||
// Can't implement other cancelation types, ignoring.
|
||||
self.get_cancellation_state().clear();
|
||||
goto EXEC_OP_WAIT;
|
||||
}
|
||||
} else {
|
||||
// Cancelation can be honored.
|
||||
conn->remove_request(info);
|
||||
self.complete(ec, 0);
|
||||
return;
|
||||
@@ -159,7 +170,7 @@ EXEC_OP_WAIT:
|
||||
BOOST_ASSERT(!conn->reqs_.empty());
|
||||
BOOST_ASSERT(conn->reqs_.front() != nullptr);
|
||||
BOOST_ASSERT(conn->cmds_ != 0);
|
||||
yield
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->async_exec_read(adapter, conn->reqs_.front()->get_number_of_commands(), std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
|
||||
@@ -193,12 +204,12 @@ struct run_op {
|
||||
, boost::system::error_code ec0 = {}
|
||||
, boost::system::error_code ec1 = {})
|
||||
{
|
||||
reenter (coro)
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
conn->write_buffer_.clear();
|
||||
conn->cmds_ = 0;
|
||||
|
||||
yield
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::experimental::make_parallel_group(
|
||||
[this](auto token) { return conn->reader(token);},
|
||||
[this](auto token) { return conn->writer(token);}
|
||||
@@ -232,11 +243,11 @@ struct writer_op {
|
||||
{
|
||||
boost::ignore_unused(n);
|
||||
|
||||
reenter (coro) for (;;)
|
||||
BOOST_ASIO_CORO_REENTER (coro) for (;;)
|
||||
{
|
||||
while (!conn->reqs_.empty() && conn->cmds_ == 0 && conn->write_buffer_.empty()) {
|
||||
conn->coalesce_requests();
|
||||
yield
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_write(conn->next_layer(), boost::asio::buffer(conn->write_buffer_), std::move(self));
|
||||
AEDIS_CHECK_OP0(conn->cancel(operation::run););
|
||||
|
||||
@@ -251,7 +262,8 @@ struct writer_op {
|
||||
}
|
||||
}
|
||||
|
||||
yield conn->writer_timer_.async_wait(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->writer_timer_.async_wait(std::move(self));
|
||||
if (!conn->is_open() || is_cancelled(self)) {
|
||||
// Notice this is not an error of the op, stoping was
|
||||
// requested from the outside, so we complete with
|
||||
@@ -275,9 +287,9 @@ struct reader_op {
|
||||
{
|
||||
boost::ignore_unused(n);
|
||||
|
||||
reenter (coro) for (;;)
|
||||
BOOST_ASIO_CORO_REENTER (coro) for (;;)
|
||||
{
|
||||
yield
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_read_until(
|
||||
conn->next_layer(),
|
||||
conn->make_dynamic_buffer(),
|
||||
@@ -311,13 +323,15 @@ struct reader_op {
|
||||
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push
|
||||
|| conn->reqs_.empty()
|
||||
|| (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)) {
|
||||
yield conn->guarded_op_.async_run(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->guarded_op_.async_run(std::move(self));
|
||||
} else {
|
||||
BOOST_ASSERT(conn->cmds_ != 0);
|
||||
BOOST_ASSERT(!conn->reqs_.empty());
|
||||
BOOST_ASSERT(conn->reqs_.front()->get_number_of_commands() != 0);
|
||||
conn->reqs_.front()->proceed();
|
||||
yield conn->read_timer_.async_wait(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->read_timer_.async_wait(std::move(self));
|
||||
ec = {};
|
||||
}
|
||||
|
||||
@@ -332,5 +346,4 @@ struct reader_op {
|
||||
|
||||
} // aedis::detail
|
||||
|
||||
#include <boost/asio/unyield.hpp>
|
||||
#endif // AEDIS_CONNECTION_OPS_HPP
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
#define AEDIS_DETAIL_GUARDED_OPERATION_HPP
|
||||
|
||||
#include <boost/asio/experimental/channel.hpp>
|
||||
#include <boost/asio/yield.hpp>
|
||||
|
||||
namespace aedis::detail {
|
||||
|
||||
@@ -21,12 +20,14 @@ struct send_receive_op {
|
||||
template <class Self>
|
||||
void operator()(Self& self, boost::system::error_code ec = {})
|
||||
{
|
||||
reenter (coro)
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
yield channel->async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
AEDIS_CHECK_OP0(;);
|
||||
|
||||
yield channel->async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_send(boost::system::error_code{}, 0, std::move(self));
|
||||
AEDIS_CHECK_OP0(;);
|
||||
|
||||
self.complete({});
|
||||
@@ -48,17 +49,20 @@ struct wait_op {
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
reenter (coro)
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
yield channel->async_receive(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_receive(std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
|
||||
yield std::move(op)(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
std::move(op)(std::move(self));
|
||||
AEDIS_CHECK_OP1(channel->cancel(););
|
||||
|
||||
res = n;
|
||||
|
||||
yield channel->async_receive(std::move(self));
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
channel->async_receive(std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
|
||||
self.complete({}, res);
|
||||
@@ -67,14 +71,14 @@ struct wait_op {
|
||||
}
|
||||
};
|
||||
|
||||
template <class Executor = boost::asio::any_io_executor>
|
||||
template <class Executor>
|
||||
class guarded_operation {
|
||||
public:
|
||||
using executor_type = Executor;
|
||||
guarded_operation(executor_type ex) : channel_{ex} {}
|
||||
|
||||
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_run(CompletionToken&& token = CompletionToken{})
|
||||
template <class CompletionToken>
|
||||
auto async_run(CompletionToken&& token)
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
@@ -82,8 +86,8 @@ public:
|
||||
>(send_receive_op<executor_type>{&channel_}, token, channel_);
|
||||
}
|
||||
|
||||
template <class Op, class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_wait(Op&& op, CompletionToken token = CompletionToken{})
|
||||
template <class Op, class CompletionToken>
|
||||
auto async_wait(Op&& op, CompletionToken token)
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
@@ -104,5 +108,4 @@ private:
|
||||
|
||||
} // aedis::detail
|
||||
|
||||
#include <boost/asio/unyield.hpp>
|
||||
#endif // AEDIS_DETAIL_GUARDED_OPERATION_HPP
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
*/
|
||||
|
||||
#include <aedis/error.hpp>
|
||||
#include <boost/assert.hpp>
|
||||
|
||||
namespace aedis {
|
||||
namespace detail {
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <aedis/resp3/detail/parser.hpp>
|
||||
#include <aedis/error.hpp>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
|
||||
#include <charconv>
|
||||
|
||||
namespace aedis::resp3::detail {
|
||||
|
||||
void to_int(int_type& i, std::string_view sv, boost::system::error_code& ec)
|
||||
{
|
||||
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), i);
|
||||
if (res.ec != std::errc())
|
||||
ec = error::not_a_number;
|
||||
}
|
||||
|
||||
parser::parser()
|
||||
{
|
||||
sizes_[0] = 2; // The sentinel must be more than 1.
|
||||
}
|
||||
|
||||
auto
|
||||
parser::consume(
|
||||
char const* data,
|
||||
std::size_t n,
|
||||
boost::system::error_code& ec) -> std::pair<node_type, std::size_t>
|
||||
{
|
||||
node_type ret;
|
||||
if (bulk_expected()) {
|
||||
n = bulk_length_ + 2;
|
||||
ret = {bulk_, 1, depth_, {data, bulk_length_}};
|
||||
bulk_ = type::invalid;
|
||||
--sizes_[depth_];
|
||||
|
||||
} else if (sizes_[depth_] != 0) {
|
||||
auto const t = to_type(*data);
|
||||
switch (t) {
|
||||
case type::streamed_string_part:
|
||||
{
|
||||
to_int(bulk_length_ , std::string_view{data + 1, n - 3}, ec);
|
||||
if (ec)
|
||||
return std::make_pair(node_type{}, 0);
|
||||
|
||||
if (bulk_length_ == 0) {
|
||||
ret = {type::streamed_string_part, 1, depth_, {}};
|
||||
sizes_[depth_] = 0; // We are done.
|
||||
bulk_ = type::invalid;
|
||||
} else {
|
||||
bulk_ = type::streamed_string_part;
|
||||
}
|
||||
} break;
|
||||
case type::blob_error:
|
||||
case type::verbatim_string:
|
||||
case type::blob_string:
|
||||
{
|
||||
if (data[1] == '?') {
|
||||
// NOTE: This can only be triggered with blob_string.
|
||||
// Trick: A streamed string is read as an aggregate
|
||||
// of infinite lenght. When the streaming is done
|
||||
// the server is supposed to send a part with length
|
||||
// 0.
|
||||
sizes_[++depth_] = (std::numeric_limits<std::size_t>::max)();
|
||||
ret = {type::streamed_string, 0, depth_, {}};
|
||||
} else {
|
||||
to_int(bulk_length_ , std::string_view{data + 1, n - 3} , ec);
|
||||
if (ec)
|
||||
return std::make_pair(node_type{}, 0);
|
||||
|
||||
bulk_ = t;
|
||||
}
|
||||
} break;
|
||||
case type::boolean:
|
||||
{
|
||||
if (n == 3) {
|
||||
ec = error::empty_field;
|
||||
return std::make_pair(node_type{}, 0);
|
||||
}
|
||||
|
||||
if (data[1] != 'f' && data[1] != 't') {
|
||||
ec = error::unexpected_bool_value;
|
||||
return std::make_pair(node_type{}, 0);
|
||||
}
|
||||
|
||||
ret = {t, 1, depth_, {data + 1, n - 3}};
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::doublean:
|
||||
case type::big_number:
|
||||
case type::number:
|
||||
{
|
||||
if (n == 3) {
|
||||
ec = error::empty_field;
|
||||
return std::make_pair(node_type{}, 0);
|
||||
}
|
||||
|
||||
ret = {t, 1, depth_, {data + 1, n - 3}};
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::simple_error:
|
||||
case type::simple_string:
|
||||
{
|
||||
ret = {t, 1, depth_, {&data[1], n - 3}};
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::null:
|
||||
{
|
||||
ret = {type::null, 1, depth_, {}};
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::push:
|
||||
case type::set:
|
||||
case type::array:
|
||||
case type::attribute:
|
||||
case type::map:
|
||||
{
|
||||
int_type l = -1;
|
||||
to_int(l, std::string_view{data + 1, n - 3}, ec);
|
||||
if (ec)
|
||||
return std::make_pair(node_type{}, 0);
|
||||
|
||||
ret = {t, l, depth_, {}};
|
||||
if (l == 0) {
|
||||
--sizes_[depth_];
|
||||
} else {
|
||||
if (depth_ == max_embedded_depth) {
|
||||
ec = error::exceeeds_max_nested_depth;
|
||||
return std::make_pair(node_type{}, 0);
|
||||
}
|
||||
|
||||
++depth_;
|
||||
|
||||
sizes_[depth_] = l * element_multiplicity(t);
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
{
|
||||
ec = error::invalid_data_type;
|
||||
return std::make_pair(node_type{}, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (sizes_[depth_] == 0) {
|
||||
--depth_;
|
||||
--sizes_[depth_];
|
||||
}
|
||||
|
||||
return std::make_pair(ret, n);
|
||||
}
|
||||
} // aedis::resp3::detail
|
||||
|
||||
@@ -7,38 +7,22 @@
|
||||
#ifndef AEDIS_RESP3_PARSER_HPP
|
||||
#define AEDIS_RESP3_PARSER_HPP
|
||||
|
||||
#include <aedis/resp3/node.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <limits>
|
||||
#include <system_error>
|
||||
#include <charconv>
|
||||
#include <string_view>
|
||||
#include <cstdint>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
#include <aedis/error.hpp>
|
||||
#include <aedis/resp3/node.hpp>
|
||||
|
||||
namespace aedis::resp3::detail {
|
||||
|
||||
using int_type = std::uint64_t;
|
||||
|
||||
inline
|
||||
void to_int(int_type& i, std::string_view sv, boost::system::error_code& ec)
|
||||
{
|
||||
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), i);
|
||||
if (res.ec != std::errc())
|
||||
ec = error::not_a_number;
|
||||
}
|
||||
|
||||
|
||||
template <class ResponseAdapter>
|
||||
class parser {
|
||||
private:
|
||||
using node_type = node<std::string_view>;
|
||||
static constexpr std::size_t max_embedded_depth = 5;
|
||||
|
||||
ResponseAdapter adapter_;
|
||||
|
||||
// The current depth. Simple data types will have depth 0, whereas
|
||||
// the elements of aggregates will have depth 1. Embedded types
|
||||
// will have increasing depth.
|
||||
@@ -57,176 +41,27 @@ private:
|
||||
type bulk_ = type::invalid;
|
||||
|
||||
public:
|
||||
explicit parser(ResponseAdapter adapter)
|
||||
: adapter_{adapter}
|
||||
{
|
||||
sizes_[0] = 2; // The sentinel must be more than 1.
|
||||
}
|
||||
parser();
|
||||
|
||||
// Returns the number of bytes that have been consumed.
|
||||
auto
|
||||
consume(char const* data, std::size_t n, boost::system::error_code& ec) -> std::size_t
|
||||
{
|
||||
if (bulk_ != type::invalid) {
|
||||
n = bulk_length_ + 2;
|
||||
switch (bulk_) {
|
||||
case type::streamed_string_part:
|
||||
{
|
||||
BOOST_ASSERT(bulk_length_ != 0);
|
||||
adapter_({bulk_, 1, depth_, {data, bulk_length_}}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
} break;
|
||||
default:
|
||||
{
|
||||
adapter_({bulk_, 1, depth_, {data, bulk_length_}}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
bulk_ = type::invalid;
|
||||
--sizes_[depth_];
|
||||
|
||||
} else if (sizes_[depth_] != 0) {
|
||||
auto const t = to_type(*data);
|
||||
switch (t) {
|
||||
case type::streamed_string_part:
|
||||
{
|
||||
to_int(bulk_length_ , std::string_view{data + 1, n - 3}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
if (bulk_length_ == 0) {
|
||||
adapter_({type::streamed_string_part, 1, depth_, {}}, ec);
|
||||
sizes_[depth_] = 0; // We are done.
|
||||
} else {
|
||||
bulk_ = type::streamed_string_part;
|
||||
}
|
||||
} break;
|
||||
case type::blob_error:
|
||||
case type::verbatim_string:
|
||||
case type::blob_string:
|
||||
{
|
||||
if (data[1] == '?') {
|
||||
// NOTE: This can only be triggered with blob_string.
|
||||
// Trick: A streamed string is read as an aggregate
|
||||
// of infinite lenght. When the streaming is done
|
||||
// the server is supposed to send a part with length
|
||||
// 0.
|
||||
sizes_[++depth_] = (std::numeric_limits<std::size_t>::max)();
|
||||
} else {
|
||||
to_int(bulk_length_ , std::string_view{data + 1, n - 3} , ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
bulk_ = t;
|
||||
}
|
||||
} break;
|
||||
case type::boolean:
|
||||
{
|
||||
if (n == 3) {
|
||||
ec = error::empty_field;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (data[1] != 'f' && data[1] != 't') {
|
||||
ec = error::unexpected_bool_value;
|
||||
return 0;
|
||||
}
|
||||
|
||||
adapter_({t, 1, depth_, {data + 1, n - 3}}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::doublean:
|
||||
case type::big_number:
|
||||
case type::number:
|
||||
{
|
||||
if (n == 3) {
|
||||
ec = error::empty_field;
|
||||
return 0;
|
||||
}
|
||||
|
||||
adapter_({t, 1, depth_, {data + 1, n - 3}}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::simple_error:
|
||||
case type::simple_string:
|
||||
{
|
||||
adapter_({t, 1, depth_, {&data[1], n - 3}}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::null:
|
||||
{
|
||||
adapter_({type::null, 1, depth_, {}}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
--sizes_[depth_];
|
||||
} break;
|
||||
case type::push:
|
||||
case type::set:
|
||||
case type::array:
|
||||
case type::attribute:
|
||||
case type::map:
|
||||
{
|
||||
int_type l = -1;
|
||||
to_int(l, std::string_view{data + 1, n - 3}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
adapter_({t, l, depth_, {}}, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
if (l == 0) {
|
||||
--sizes_[depth_];
|
||||
} else {
|
||||
if (depth_ == max_embedded_depth) {
|
||||
ec = error::exceeeds_max_nested_depth;
|
||||
return 0;
|
||||
}
|
||||
|
||||
++depth_;
|
||||
|
||||
sizes_[depth_] = l * element_multiplicity(t);
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
{
|
||||
ec = error::invalid_data_type;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (sizes_[depth_] == 0) {
|
||||
--depth_;
|
||||
--sizes_[depth_];
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
consume(
|
||||
char const* data,
|
||||
std::size_t n,
|
||||
boost::system::error_code& ec) -> std::pair<node_type, std::size_t>;
|
||||
|
||||
// Returns true when the parser is done with the current message.
|
||||
[[nodiscard]] auto done() const noexcept
|
||||
{ return depth_ == 0 && bulk_ == type::invalid; }
|
||||
|
||||
// The bulk type expected in the next read. If none is expected returns
|
||||
// type::invalid.
|
||||
[[nodiscard]] auto bulk() const noexcept { return bulk_; }
|
||||
// The bulk type expected in the next read. If none is expected
|
||||
// returns type::invalid.
|
||||
[[nodiscard]] auto bulk_expected() const noexcept -> bool
|
||||
{ return bulk_ != type::invalid; }
|
||||
|
||||
// The length expected in the the next bulk.
|
||||
[[nodiscard]] auto bulk_length() const noexcept { return bulk_length_; }
|
||||
[[nodiscard]] auto bulk_length() const noexcept
|
||||
{ return bulk_length_; }
|
||||
};
|
||||
|
||||
} // detail::resp3::aedis
|
||||
|
||||
@@ -7,15 +7,15 @@
|
||||
#ifndef AEDIS_RESP3_READ_OPS_HPP
|
||||
#define AEDIS_RESP3_READ_OPS_HPP
|
||||
|
||||
#include <string_view>
|
||||
#include <aedis/resp3/detail/parser.hpp>
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
#include <boost/asio/read.hpp>
|
||||
#include <boost/asio/read_until.hpp>
|
||||
#include <boost/asio/coroutine.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
#include <aedis/resp3/detail/parser.hpp>
|
||||
|
||||
#include <boost/asio/yield.hpp>
|
||||
#include <string_view>
|
||||
|
||||
namespace aedis::detail
|
||||
{
|
||||
@@ -61,7 +61,8 @@ class parse_op {
|
||||
private:
|
||||
AsyncReadStream& stream_;
|
||||
DynamicBuffer buf_;
|
||||
parser<ResponseAdapter> parser_;
|
||||
parser parser_;
|
||||
ResponseAdapter adapter_;
|
||||
std::size_t consumed_ = 0;
|
||||
std::size_t buffer_size_ = 0;
|
||||
boost::asio::coroutine coro_{};
|
||||
@@ -70,7 +71,7 @@ public:
|
||||
parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter)
|
||||
: stream_ {stream}
|
||||
, buf_ {std::move(buf)}
|
||||
, parser_ {std::move(adapter)}
|
||||
, adapter_ {std::move(adapter)}
|
||||
{ }
|
||||
|
||||
template <class Self>
|
||||
@@ -78,9 +79,9 @@ public:
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
reenter (coro_) for (;;) {
|
||||
if (parser_.bulk() == type::invalid) {
|
||||
yield
|
||||
BOOST_ASIO_CORO_REENTER (coro_) for (;;) {
|
||||
if (!parser_.bulk_expected()) {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_read_until(stream_, buf_, "\r\n", std::move(self));
|
||||
AEDIS_CHECK_OP1(;);
|
||||
} else {
|
||||
@@ -95,7 +96,7 @@ public:
|
||||
buffer_size_ = buf_.size();
|
||||
buf_.grow(parser_.bulk_length() + 2 - buffer_size_);
|
||||
|
||||
yield
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
boost::asio::async_read(
|
||||
stream_,
|
||||
buf_.data(buffer_size_, parser_.bulk_length() + 2 - buffer_size_),
|
||||
@@ -108,14 +109,18 @@ public:
|
||||
BOOST_ASSERT(buf_.size() >= n);
|
||||
}
|
||||
|
||||
n = parser_.consume(static_cast<char const*>(buf_.data(0, n).data()), n, ec);
|
||||
if (ec) {
|
||||
self.complete(ec, 0);
|
||||
return;
|
||||
auto const res = parser_.consume(static_cast<char const*>(buf_.data(0, n).data()), n, ec);
|
||||
if (ec)
|
||||
return self.complete(ec, 0);
|
||||
|
||||
if (!parser_.bulk_expected()) {
|
||||
adapter_(res.first, ec);
|
||||
if (ec)
|
||||
return self.complete(ec, 0);
|
||||
}
|
||||
|
||||
buf_.consume(n);
|
||||
consumed_ += n;
|
||||
buf_.consume(res.second);
|
||||
consumed_ += res.second;
|
||||
if (parser_.done()) {
|
||||
self.complete({}, consumed_);
|
||||
return;
|
||||
@@ -126,5 +131,4 @@ public:
|
||||
|
||||
} // aedis::resp3::detail
|
||||
|
||||
#include <boost/asio/unyield.hpp>
|
||||
#endif // AEDIS_RESP3_READ_OPS_HPP
|
||||
|
||||
@@ -4,12 +4,13 @@
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <string_view>
|
||||
#include <aedis/resp3/request.hpp>
|
||||
|
||||
#include <string_view>
|
||||
|
||||
namespace aedis::resp3::detail {
|
||||
|
||||
auto has_push_response(std::string_view cmd) -> bool
|
||||
auto has_response(std::string_view cmd) -> bool
|
||||
{
|
||||
if (cmd == "SUBSCRIBE") return true;
|
||||
if (cmd == "PSUBSCRIBE") return true;
|
||||
|
||||
@@ -4,8 +4,8 @@
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <boost/assert.hpp>
|
||||
#include <aedis/resp3/type.hpp>
|
||||
#include <boost/assert.hpp>
|
||||
|
||||
namespace aedis::resp3 {
|
||||
|
||||
@@ -27,6 +27,7 @@ auto to_string(type t) -> char const*
|
||||
case type::blob_error: return "blob_error";
|
||||
case type::verbatim_string: return "verbatim_string";
|
||||
case type::blob_string: return "blob_string";
|
||||
case type::streamed_string: return "streamed_string";
|
||||
case type::streamed_string_part: return "streamed_string_part";
|
||||
default: return "invalid";
|
||||
}
|
||||
|
||||
@@ -14,11 +14,33 @@ namespace aedis::resp3 {
|
||||
/** \brief A node in the response tree.
|
||||
* \ingroup high-level-api
|
||||
*
|
||||
* Redis responses are the pre-order view of the response tree (see
|
||||
* https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR).
|
||||
* RESP3 can contain recursive data structures: A map of sets of
|
||||
* vector of etc. As it is parsed each element is passed to user
|
||||
* callbacks (push parser), the `aedis::adapt` function. The signature of this
|
||||
* callback is `f(resp3::node<std::string_view)`. This class is called a node
|
||||
* because it can be seen as the element of the response tree. It
|
||||
* is a template so that users can use it with owing strings e.g.
|
||||
* `std::string` or `boost::static_string` etc. if they decide to use a node as
|
||||
* response type, for example, to read a non-aggregate data-type use
|
||||
*
|
||||
* \remark Any Redis response can be received in an array of nodes,
|
||||
* for example \c std::vector<node<std::string>>.
|
||||
* ```cpp
|
||||
* resp3::node<std::string> resp;
|
||||
* co_await conn->async_exec(req, adapt(resp));
|
||||
* ```
|
||||
*
|
||||
* for an aggregate use instead
|
||||
*
|
||||
* ```cpp
|
||||
* std::vector<resp3::node<std::string>> resp; co_await
|
||||
* conn->async_exec(req, adapt(resp));
|
||||
* ```
|
||||
*
|
||||
* The vector will contain the
|
||||
* [pre-order](https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR)
|
||||
* view of the response tree. Any Redis response can be received in
|
||||
* an array of nodes as shown above.
|
||||
*
|
||||
* \tparam String A `std::string`-like type.
|
||||
*/
|
||||
template <class String>
|
||||
struct node {
|
||||
|
||||
@@ -59,11 +59,11 @@ read(
|
||||
ResponseAdapter adapter,
|
||||
boost::system::error_code& ec) -> std::size_t
|
||||
{
|
||||
detail::parser<ResponseAdapter> p {adapter};
|
||||
detail::parser p;
|
||||
std::size_t n = 0;
|
||||
std::size_t consumed = 0;
|
||||
do {
|
||||
if (p.bulk() == type::invalid) {
|
||||
if (!p.bulk_expected()) {
|
||||
n = boost::asio::read_until(stream, buf, "\r\n", ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
@@ -81,12 +81,18 @@ read(
|
||||
}
|
||||
|
||||
auto const* data = static_cast<char const*>(buf.data(0, n).data());
|
||||
n = p.consume(data, n, ec);
|
||||
auto const res = p.consume(data, n, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
|
||||
buf.consume(n);
|
||||
consumed += n;
|
||||
if (!p.bulk_expected()) {
|
||||
adapter(res.first, ec);
|
||||
if (ec)
|
||||
return 0;
|
||||
}
|
||||
|
||||
buf.consume(res.second);
|
||||
consumed += res.second;
|
||||
} while (!p.done());
|
||||
|
||||
return consumed;
|
||||
|
||||
@@ -7,10 +7,11 @@
|
||||
#ifndef AEDIS_RESP3_REQUEST_HPP
|
||||
#define AEDIS_RESP3_REQUEST_HPP
|
||||
|
||||
#include <aedis/resp3/type.hpp>
|
||||
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
#include <memory_resource>
|
||||
#include <aedis/resp3/type.hpp>
|
||||
|
||||
// NOTE: Consider detecting tuples in the type in the parameter pack
|
||||
// to calculate the header size correctly.
|
||||
@@ -62,7 +63,7 @@ void to_bulk(Request& to, T n)
|
||||
|
||||
namespace detail {
|
||||
|
||||
auto has_push_response(std::string_view cmd) -> bool;
|
||||
auto has_response(std::string_view cmd) -> bool;
|
||||
|
||||
template <class T>
|
||||
struct add_bulk_impl {
|
||||
@@ -169,35 +170,37 @@ class request {
|
||||
public:
|
||||
/// Request configuration options.
|
||||
struct config {
|
||||
/** \brief Setting it to true will cause
|
||||
* `aedis::connection::async_exec` to complete with error if the
|
||||
/** \brief If `true`
|
||||
* `aedis::connection::async_exec` will complete with error if the
|
||||
* connection is lost. Affects only requests that haven't been
|
||||
* sent yet.
|
||||
*/
|
||||
bool cancel_on_connection_lost = true;
|
||||
|
||||
/** \brief If true the request will be coalesced with other
|
||||
/** \brief If `true` the request will be coalesced with other
|
||||
* requests, see https://redis.io/topics/pipelining. Otherwise
|
||||
* the request is sent individually.
|
||||
*/
|
||||
bool coalesce = true;
|
||||
|
||||
/** \brief If true, the request will complete with error if the
|
||||
* call happens before the connection with Redis was established.
|
||||
/** \brief If `true` the request will complete with
|
||||
* aedis::error::not_connected if `async_exec` is called before
|
||||
* the connection with Redis was established.
|
||||
*/
|
||||
bool cancel_if_not_connected = false;
|
||||
|
||||
/** \brief If true `aedis::connection::async_exec` will not
|
||||
* cancel this request if the connection is lost. Affects only
|
||||
* requests that have been written to the socket but remained
|
||||
* unresponded when `aedis::connection::async_run` completed.
|
||||
/** \brief If `false` `aedis::connection::async_exec` will not
|
||||
* automatically cancel this request if the connection is lost.
|
||||
* Affects only requests that have been written to the socket
|
||||
* but remained unresponded when `aedis::connection::async_run`
|
||||
* completed.
|
||||
*/
|
||||
bool retry_on_connection_lost = false;
|
||||
bool cancel_if_unresponded = true;
|
||||
|
||||
/** \brief If this request has a HELLO command and this flag is
|
||||
* true, the `aedis::connection` will move it to the front of
|
||||
/** \brief If this request has a `HELLO` command and this flag is
|
||||
* `true`, the `aedis::connection` will move it to the front of
|
||||
* the queue of awaiting requests. This makes it possible to
|
||||
* send HELLO and authenticate before other commands are sent.
|
||||
* send `HELLO` and authenticate before other commands are sent.
|
||||
*/
|
||||
bool hello_with_priority = true;
|
||||
};
|
||||
@@ -208,7 +211,7 @@ public:
|
||||
* \param resource Memory resource.
|
||||
*/
|
||||
explicit
|
||||
request(config cfg = config{true, true, false, false, true},
|
||||
request(config cfg = config{true, true, false, true, true},
|
||||
std::pmr::memory_resource* resource = std::pmr::get_default_resource())
|
||||
: cfg_{cfg}, payload_(resource) {}
|
||||
|
||||
@@ -384,7 +387,7 @@ public:
|
||||
private:
|
||||
void check_cmd(std::string_view cmd)
|
||||
{
|
||||
if (!detail::has_push_response(cmd))
|
||||
if (!detail::has_response(cmd))
|
||||
++commands_;
|
||||
|
||||
if (cmd == "HELLO")
|
||||
|
||||
@@ -50,6 +50,8 @@ enum class type
|
||||
/// Simple
|
||||
blob_string,
|
||||
/// Simple
|
||||
streamed_string,
|
||||
/// Simple
|
||||
streamed_string_part,
|
||||
/// Invalid
|
||||
invalid
|
||||
|
||||
@@ -7,3 +7,4 @@
|
||||
#include <aedis/impl/error.ipp>
|
||||
#include <aedis/resp3/impl/request.ipp>
|
||||
#include <aedis/resp3/impl/type.ipp>
|
||||
#include <aedis/resp3/detail/impl/parser.ipp>
|
||||
|
||||
@@ -7,11 +7,12 @@
|
||||
#ifndef AEDIS_SSL_CONNECTION_HPP
|
||||
#define AEDIS_SSL_CONNECTION_HPP
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <aedis/detail/connection_base.hpp>
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <aedis/detail/connection_base.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
|
||||
namespace aedis::ssl {
|
||||
|
||||
|
||||
@@ -122,123 +122,3 @@ BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected)
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
// TODO: This test is broken.
|
||||
BOOST_AUTO_TEST_CASE(request_retry_false)
|
||||
{
|
||||
resp3::request req0;
|
||||
req0.get_config().coalesce = false;
|
||||
req0.get_config().cancel_on_connection_lost = true;
|
||||
req0.push("HELLO", 3);
|
||||
|
||||
resp3::request req1;
|
||||
req1.get_config().coalesce = true;
|
||||
req1.get_config().cancel_on_connection_lost = true;
|
||||
req1.push("BLPOP", "any", 0);
|
||||
|
||||
resp3::request req2;
|
||||
req2.get_config().coalesce = true;
|
||||
req2.get_config().cancel_on_connection_lost = false;
|
||||
req2.get_config().retry_on_connection_lost = false;
|
||||
req2.push("PING");
|
||||
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
|
||||
net::steady_timer st{ioc};
|
||||
st.expires_after(std::chrono::seconds{1});
|
||||
st.async_wait([&](auto){
|
||||
// Cancels the request before receiving the response. This
|
||||
// should cause the second request to complete with error
|
||||
// although it has cancel_on_connection_lost = false.
|
||||
conn.cancel(aedis::operation::run);
|
||||
});
|
||||
|
||||
auto const endpoints = resolve();
|
||||
net::connect(conn.next_layer(), endpoints);
|
||||
|
||||
conn.async_exec(req0, adapt(), [](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
|
||||
conn.async_exec(req1, adapt(), [](auto ec, auto){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
conn.async_exec(req2, adapt(), [](auto ec, auto){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
conn.async_run([](auto ec){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(request_retry_true)
|
||||
{
|
||||
resp3::request req0;
|
||||
req0.get_config().coalesce = false;
|
||||
req0.get_config().cancel_on_connection_lost = true;
|
||||
req0.push("HELLO", 3);
|
||||
|
||||
resp3::request req1;
|
||||
req1.get_config().coalesce = true;
|
||||
req1.get_config().cancel_on_connection_lost = true;
|
||||
req1.push("BLPOP", "any", 0);
|
||||
|
||||
resp3::request req2;
|
||||
req2.get_config().coalesce = true;
|
||||
req2.get_config().cancel_on_connection_lost = false;
|
||||
req2.get_config().retry_on_connection_lost = true;
|
||||
req2.push("PING");
|
||||
|
||||
resp3::request req3;
|
||||
req3.get_config().coalesce = true;
|
||||
req3.get_config().cancel_on_connection_lost = true;
|
||||
req3.get_config().retry_on_connection_lost = false;
|
||||
req3.push("QUIT");
|
||||
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
|
||||
net::steady_timer st{ioc};
|
||||
st.expires_after(std::chrono::seconds{1});
|
||||
st.async_wait([&](auto){
|
||||
// Cancels the request before receiving the response. This
|
||||
// should cause the second request to complete with error
|
||||
// although it has cancel_on_connection_lost = false.
|
||||
conn.cancel(aedis::operation::run);
|
||||
});
|
||||
|
||||
auto const endpoints = resolve();
|
||||
net::connect(conn.next_layer(), endpoints);
|
||||
|
||||
conn.async_exec(req0, adapt(), [](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
|
||||
conn.async_exec(req1, adapt(), [](auto ec, auto){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
conn.async_exec(req2, adapt(), [&](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
conn.async_exec(req3, adapt(), [&](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
});
|
||||
|
||||
conn.async_run([&](auto ec){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
conn.reset_stream();
|
||||
net::connect(conn.next_layer(), endpoints);
|
||||
conn.async_run([&](auto ec){
|
||||
std::cout << ec.message() << std::endl;
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>
|
||||
// Will be cancelled before it is written.
|
||||
resp3::request req2;
|
||||
req2.get_config().coalesce = false;
|
||||
//req2.get_config().cancel_on_connection_lost = true;
|
||||
req2.get_config().cancel_on_connection_lost = true;
|
||||
req2.push("PING");
|
||||
|
||||
net::steady_timer st{ex};
|
||||
@@ -123,11 +123,9 @@ auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>
|
||||
st.async_wait(redir(ec3))
|
||||
);
|
||||
|
||||
BOOST_TEST(!ec1);
|
||||
BOOST_CHECK_EQUAL(ec1, net::error::basic_errors::operation_aborted);
|
||||
BOOST_CHECK_EQUAL(ec2, net::error::basic_errors::operation_aborted);
|
||||
BOOST_TEST(!ec3);
|
||||
|
||||
conn->cancel(operation::run);
|
||||
}
|
||||
|
||||
auto cancel_of_req_written_on_run_canceled() -> net::awaitable<void>
|
||||
@@ -138,7 +136,7 @@ auto cancel_of_req_written_on_run_canceled() -> net::awaitable<void>
|
||||
|
||||
resp3::request req1;
|
||||
req1.get_config().cancel_on_connection_lost = true;
|
||||
req1.get_config().retry_on_connection_lost = false;
|
||||
req1.get_config().cancel_if_unresponded = true;
|
||||
req1.push("BLPOP", "any", 0);
|
||||
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
|
||||
148
tests/conn_exec_retry.cpp
Normal file
148
tests/conn_exec_retry.cpp
Normal file
@@ -0,0 +1,148 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/system/errc.hpp>
|
||||
|
||||
#define BOOST_TEST_MODULE low level
|
||||
#include <boost/test/included/unit_test.hpp>
|
||||
|
||||
#include <aedis.hpp>
|
||||
#include <aedis/src.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
using error_code = boost::system::error_code;
|
||||
using connection = aedis::connection;
|
||||
using aedis::adapt;
|
||||
|
||||
BOOST_AUTO_TEST_CASE(request_retry_false)
|
||||
{
|
||||
resp3::request req0;
|
||||
req0.get_config().coalesce = false;
|
||||
req0.get_config().cancel_on_connection_lost = true;
|
||||
req0.push("HELLO", 3);
|
||||
|
||||
resp3::request req1;
|
||||
req1.get_config().coalesce = true;
|
||||
req1.get_config().cancel_on_connection_lost = true;
|
||||
req1.push("BLPOP", "any", 0);
|
||||
|
||||
resp3::request req2;
|
||||
req2.get_config().coalesce = true;
|
||||
req2.get_config().cancel_on_connection_lost = false;
|
||||
req2.get_config().cancel_if_unresponded = true;
|
||||
req2.push("PING");
|
||||
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
|
||||
net::steady_timer st{ioc};
|
||||
st.expires_after(std::chrono::seconds{1});
|
||||
st.async_wait([&](auto){
|
||||
// Cancels the request before receiving the response. This
|
||||
// should cause the third request to complete with error
|
||||
// although it has cancel_on_connection_lost = false. The reason
|
||||
// being is has already been written so
|
||||
// cancel_on_connection_lost does not apply.
|
||||
conn.cancel(aedis::operation::run);
|
||||
});
|
||||
|
||||
auto const endpoints = resolve();
|
||||
net::connect(conn.next_layer(), endpoints);
|
||||
|
||||
conn.async_exec(req0, adapt(), [](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
|
||||
conn.async_exec(req1, adapt(), [](auto ec, auto){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
conn.async_exec(req2, adapt(), [](auto ec, auto){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
conn.async_run([](auto ec){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(request_retry_true)
|
||||
{
|
||||
resp3::request req0;
|
||||
req0.get_config().coalesce = false;
|
||||
req0.get_config().cancel_on_connection_lost = true;
|
||||
req0.push("HELLO", 3);
|
||||
|
||||
resp3::request req1;
|
||||
req1.get_config().coalesce = true;
|
||||
req1.get_config().cancel_on_connection_lost = true;
|
||||
req1.push("BLPOP", "any", 0);
|
||||
|
||||
resp3::request req2;
|
||||
req2.get_config().coalesce = true;
|
||||
req2.get_config().cancel_on_connection_lost = false;
|
||||
req2.get_config().cancel_if_unresponded = false;
|
||||
req2.push("PING");
|
||||
|
||||
resp3::request req3;
|
||||
req3.get_config().coalesce = true;
|
||||
req3.get_config().cancel_on_connection_lost = true;
|
||||
req3.get_config().cancel_if_unresponded = true;
|
||||
req3.push("QUIT");
|
||||
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
|
||||
net::steady_timer st{ioc};
|
||||
st.expires_after(std::chrono::seconds{1});
|
||||
st.async_wait([&](auto){
|
||||
// Cancels the request before receiving the response. This
|
||||
// should cause the thrid request to not complete with error
|
||||
// since it has cancel_if_unresponded = true and cancellation commes
|
||||
// after it was written.
|
||||
conn.cancel(aedis::operation::run);
|
||||
});
|
||||
|
||||
auto const endpoints = resolve();
|
||||
net::connect(conn.next_layer(), endpoints);
|
||||
|
||||
conn.async_exec(req0, adapt(), [](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
|
||||
conn.async_exec(req1, adapt(), [](auto ec, auto){
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
});
|
||||
|
||||
conn.async_exec(req2, adapt(), [&](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
conn.async_exec(req3, adapt(), [&](auto ec, auto){
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
});
|
||||
|
||||
conn.async_run([&](auto ec){
|
||||
// The first cacellation.
|
||||
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
|
||||
conn.reset_stream();
|
||||
|
||||
// Reconnects and runs again to test req3.
|
||||
net::connect(conn.next_layer(), endpoints);
|
||||
conn.async_run([&](auto ec){
|
||||
std::cout << ec.message() << std::endl;
|
||||
BOOST_TEST(!ec);
|
||||
});
|
||||
});
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
@@ -73,7 +73,7 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
|
||||
resp3::request req1;
|
||||
req1.get_config().cancel_if_not_connected = false;
|
||||
req1.get_config().cancel_on_connection_lost = true;
|
||||
req1.get_config().retry_on_connection_lost = false;
|
||||
req1.get_config().cancel_if_unresponded = true;
|
||||
req1.push("HELLO", 3);
|
||||
req1.push("BLPOP", "any", 0);
|
||||
|
||||
@@ -92,7 +92,7 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
|
||||
resp3::request req2;
|
||||
req2.get_config().cancel_if_not_connected = false;
|
||||
req2.get_config().cancel_on_connection_lost = true;
|
||||
req2.get_config().retry_on_connection_lost= false;
|
||||
req2.get_config().cancel_if_unresponded= true;
|
||||
req2.push("HELLO", 3);
|
||||
req2.push("QUIT");
|
||||
|
||||
|
||||
@@ -168,7 +168,7 @@ BOOST_AUTO_TEST_CASE(reset_before_run_completes)
|
||||
ioc.run();
|
||||
}
|
||||
|
||||
using slave_operation = aedis::detail::guarded_operation<>;
|
||||
using slave_operation = aedis::detail::guarded_operation<net::any_io_executor>;
|
||||
|
||||
auto master(std::shared_ptr<slave_operation> op) -> net::awaitable<void>
|
||||
{
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user