2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-24 18:22:09 +00:00

Compare commits

...

16 Commits

Author SHA1 Message Date
Marcelo Zimbres
63f9b74502 Improves the stress test. 2022-12-18 09:20:29 +01:00
Marcelo Zimbres
801f60a026 Readme improvements. 2022-12-17 22:59:44 +01:00
Marcelo Zimbres
c37fcb641c Documentation improvements. 2022-12-17 16:59:06 +01:00
Marcelo Zimbres
48c3f37168 Test improvements and bugfix in send-retry. 2022-12-11 22:19:37 +01:00
Marcelo Zimbres
3c63911802 Removes some boost dependencies. 2022-12-10 19:42:51 +01:00
Marcelo Zimbres
1645881a44 Doc improvements and add guarded_op class. 2022-12-07 22:32:49 +01:00
Marcelo Zimbres
730e06c38d Adds bigobj and other vs flags. 2022-12-04 20:36:17 +01:00
Marcelo Zimbres
cf3a79737d Removes mingw from windows builds. 2022-12-04 16:15:12 +01:00
Marcelo Zimbres
edb384c843 Build fix on windows. 2022-12-04 15:13:14 +01:00
Marcelo Zimbres
f745faddf8 Replaces boost::string_view with std::string_view. 2022-12-04 13:53:44 +01:00
Marcelo Zimbres
927117568e Build fix. 2022-12-04 13:53:44 +01:00
Marcelo Zimbres
1e7c176f92 Removes dependency on Boost.Hana. 2022-12-03 22:29:04 +01:00
Marcelo Zimbres
449b5f7e7c First steps with windows CI. 2022-12-03 18:14:10 +01:00
Marcelo Zimbres
75f91f3b11 v1.3.1 and build fixes. 2022-12-03 14:34:15 +01:00
Marcelo Zimbres
b9a23568e3 Many improvements in the examples. 2022-12-02 22:58:39 +01:00
Marcelo Zimbres
4ac2509afa Improvements in the docs and examples. 2022-11-27 21:59:02 +01:00
55 changed files with 2986 additions and 1169 deletions

View File

@@ -3,6 +3,92 @@ name: CI
on: [push, pull_request]
jobs:
windows:
name: "${{matrix.generator}} ${{matrix.toolset}} Boost ${{matrix.boost_version}} ${{matrix.build_type}} ${{matrix.name_args}}"
runs-on: ${{matrix.os}}
defaults:
run:
shell: bash
strategy:
fail-fast: false
matrix:
boost_version: ["1.80.0"]
os: [windows-2019, windows-2022]
toolset: [v142, v143]
build_type: [Release]
generator: ["Visual Studio 16 2019", "Visual Studio 17 2022"]
config_args: [""]
build_args: [""]
name_args: [""]
exclude:
- { os: windows-2019, toolset: v143 }
- { os: windows-2019, generator: "Visual Studio 17 2022" }
- { os: windows-2022, generator: "Visual Studio 16 2019" }
# The following combinations are not available through install-boost
- { boost_version: "1.80.0", toolset: v143 }
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Add boost toolset to environment
if: contains(fromJson('["1.80.0"]'), matrix.boost_version)
run: echo BOOST_TOOLSET=$(echo "msvc") >> $GITHUB_ENV
# The platform_version passed to boost-install determines the msvc toolset version for which static libs are installed.
- name: Add boost platform version to environment
run: |
declare -A toolset_to_platform_version=( [v142]="2019" [v143]="2022" )
key=$(echo "${{matrix.toolset}}")
echo BOOST_PLATFORM_VERSION="${toolset_to_platform_version[$key]}" >> $GITHUB_ENV
- name: Add boost install path to environment
run: echo BOOST_INSTALL_PATH="${GITHUB_WORKSPACE}/boost-${{matrix.boost_version}}${BOOST_TOOLSET}${BOOST_PLATFORM_VERSION}" >> $GITHUB_ENV
- name: Add build type configuration to environment
run: echo BUILD_CONFIG_ARG="--config ${{matrix.build_type}}" >> $GITHUB_ENV
- name: Cache Boost installation
id: cache-boost
uses: actions/cache@v3
with:
path: ${{env.BOOST_INSTALL_PATH}}
key: ${{matrix.boost_version}}${{env.BOOST_TOOLSET}}${{env.BOOST_PLATFORM_VERSION}}
- name: Install Boost
if: steps.cache-boost.outputs.cache-hit != 'true'
uses: MarkusJx/install-boost@v2.4.1
with:
boost_version: ${{matrix.boost_version}}
toolset: ${{env.BOOST_TOOLSET}}
boost_install_dir: ${{env.BOOST_INSTALL_PATH}}
platform_version: ${{env.BOOST_PLATFORM_VERSION}}
arch: null
- name: Install packages
run: cinst openssl
- name: Create build directory
run: mkdir build
- name: Configure
working-directory: build
run: |
cmake -T "${{matrix.toolset}}" \
-G "${{matrix.generator}}" \
${{matrix.config_args}} \
${BOOST_COMPILER_ARG}\
"${GITHUB_WORKSPACE}"
env:
BOOST_ROOT: ${{env.BOOST_INSTALL_PATH}}/boost
- name: Build
working-directory: build
run: |
cmake --build . ${BUILD_CONFIG_ARG} ${{matrix.build_args}}
posix:
defaults:
run:
@@ -12,7 +98,6 @@ jobs:
fail-fast: false
matrix:
include:
- { toolset: gcc, compiler: g++-10, install: g++-10, os: ubuntu-22.04, cxxstd: 'c++17' }
- { toolset: gcc, compiler: g++-11, install: g++-11, os: ubuntu-22.04, cxxstd: 'c++17' }
- { toolset: gcc, compiler: g++-11, install: g++-11, os: ubuntu-22.04, cxxstd: 'c++20' }
- { toolset: clang, compiler: clang++-11, install: clang-11, os: ubuntu-22.04, cxxstd: 'c++17' }
@@ -35,7 +120,7 @@ jobs:
uses: MarkusJx/install-boost@v2.3.0
id: install-boost
with:
boost_version: 1.79.0
boost_version: 1.80.0
platform_version: 22.04
- name: Run CMake
run: |

View File

@@ -30,7 +30,7 @@ jobs:
uses: MarkusJx/install-boost@v2.3.0
id: install-boost
with:
boost_version: 1.79.0
boost_version: 1.80.0
platform_version: 22.04
- name: Run CMake
run: |

View File

@@ -10,7 +10,7 @@ cmake_minimum_required(VERSION 3.14)
project(
Aedis
VERSION 1.3.0
VERSION 1.4.0
DESCRIPTION "A redis client designed for performance and scalability"
HOMEPAGE_URL "https://mzimbres.github.io/aedis"
LANGUAGES CXX
@@ -22,28 +22,33 @@ target_include_directories(aedis INTERFACE
$<INSTALL_INTERFACE:include>
)
target_link_libraries(aedis
target_link_libraries(
aedis
INTERFACE
Boost::asio
Boost::assert
Boost::config
Boost::core
Boost::mp11
Boost::optional
Boost::system
Boost::utility
Boost::winapi
)
target_compile_features(aedis INTERFACE cxx_std_17)
# Asio bases C++ feature detection on __cplusplus. Make MSVC
# define it correctly
if (MSVC)
target_compile_options(aedis INTERFACE /Zc:__cplusplus)
endif()
include(CMakePackageConfigHelpers)
write_basic_package_version_file(
"${PROJECT_BINARY_DIR}/AedisConfigVersion.cmake"
COMPATIBILITY AnyNewerVersion
)
find_package(Boost 1.79 REQUIRED)
find_package(Boost 1.80 REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})
find_package(OpenSSL REQUIRED)
@@ -54,103 +59,221 @@ include_directories(include)
# Main function for the examples.
#=======================================================================
add_library(common STATIC examples/common.cpp)
add_library(common STATIC
examples/common/common.cpp
examples/common/main.cpp
examples/common/aedis.cpp
)
target_compile_features(common PUBLIC cxx_std_20)
if (MSVC)
target_compile_options(common PRIVATE /bigobj)
target_compile_definitions(common PRIVATE _WIN32_WINNT=0x0601)
endif()
# Executables
#=======================================================================
#add_executable(intro_sync examples/intro_sync.cpp) // Uncomment after update to Boost 1.80
add_executable(intro examples/intro.cpp)
target_link_libraries(intro common)
target_compile_features(intro PUBLIC cxx_std_20)
add_test(intro intro)
if (MSVC)
target_compile_options(intro PRIVATE /bigobj)
target_compile_definitions(intro PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(intro_sync examples/intro_sync.cpp)
target_compile_features(intro_sync PUBLIC cxx_std_20)
add_test(intro_sync intro_sync)
add_test(intro_sync intro_sync)
if (MSVC)
target_compile_options(intro_sync PRIVATE /bigobj)
target_compile_definitions(intro_sync PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(chat_room examples/chat_room.cpp)
target_compile_features(chat_room PUBLIC cxx_std_20)
target_link_libraries(chat_room common)
if (MSVC)
target_compile_options(chat_room PRIVATE /bigobj)
target_compile_definitions(chat_room PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(containers examples/containers.cpp)
target_compile_features(containers PUBLIC cxx_std_20)
target_link_libraries(containers common)
add_test(containers containers)
if (MSVC)
target_compile_options(containers PRIVATE /bigobj)
target_compile_definitions(containers PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(echo_server examples/echo_server.cpp)
target_compile_features(echo_server PUBLIC cxx_std_20)
target_link_libraries(echo_server common)
if (MSVC)
target_compile_options(echo_server PRIVATE /bigobj)
target_compile_definitions(echo_server PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(resolve_with_sentinel examples/resolve_with_sentinel.cpp)
target_compile_features(resolve_with_sentinel PUBLIC cxx_std_20)
target_link_libraries(resolve_with_sentinel common)
add_test(resolve_with_sentinel resolve_with_sentinel)
#add_test(resolve_with_sentinel resolve_with_sentinel)
if (MSVC)
target_compile_options(resolve_with_sentinel PRIVATE /bigobj)
target_compile_definitions(resolve_with_sentinel PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(serialization examples/serialization.cpp)
target_compile_features(serialization PUBLIC cxx_std_20)
target_link_libraries(serialization common)
add_test(serialization serialization)
if (MSVC)
target_compile_options(serialization PRIVATE /bigobj)
target_compile_definitions(serialization PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(subscriber examples/subscriber.cpp)
target_compile_features(subscriber PUBLIC cxx_std_20)
target_link_libraries(subscriber common)
if (MSVC)
target_compile_options(subscriber PRIVATE /bigobj)
target_compile_definitions(subscriber PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(intro_tls examples/intro_tls.cpp)
target_compile_features(intro_tls PUBLIC cxx_std_20)
add_test(intro_tls intro_tls)
target_link_libraries(intro_tls OpenSSL::Crypto OpenSSL::SSL)
target_link_libraries(intro_tls common)
if (MSVC)
target_compile_options(intro_tls PRIVATE /bigobj)
target_compile_definitions(intro_tls PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(low_level_async examples/low_level_async.cpp)
target_compile_features(low_level_async PUBLIC cxx_std_20)
add_test(low_level_async low_level_async)
target_link_libraries(low_level_async common)
if (MSVC)
target_compile_options(low_level_async PRIVATE /bigobj)
target_compile_definitions(low_level_async PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(echo_server_client benchmarks/cpp/asio/echo_server_client.cpp)
add_executable(echo_server_direct benchmarks/cpp/asio/echo_server_direct.cpp)
add_executable(low_level_sync examples/low_level_sync.cpp)
add_executable(low_level_async examples/low_level_async.cpp)
add_executable(test_conn_exec tests/conn_exec.cpp)
add_executable(test_conn_push tests/conn_push.cpp)
add_executable(test_conn_quit tests/conn_quit.cpp)
add_executable(test_conn_quit_coalesce tests/conn_quit_coalesce.cpp)
add_executable(test_conn_reconnect tests/conn_reconnect.cpp)
add_executable(test_conn_tls tests/conn_tls.cpp)
add_executable(test_low_level tests/low_level.cpp)
add_executable(test_conn_run_cancel tests/conn_run_cancel.cpp)
add_executable(test_conn_exec_cancel tests/conn_exec_cancel.cpp)
add_executable(test_conn_echo_stress tests/conn_echo_stress.cpp)
add_executable(test_request tests/request.cpp)
target_compile_features(echo_server_client PUBLIC cxx_std_20)
if (MSVC)
target_compile_options(echo_server_client PRIVATE /bigobj)
target_compile_definitions(echo_server_client PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(echo_server_direct benchmarks/cpp/asio/echo_server_direct.cpp)
target_compile_features(echo_server_direct PUBLIC cxx_std_20)
if (MSVC)
target_compile_options(echo_server_direct PRIVATE /bigobj)
target_compile_definitions(echo_server_direct PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(low_level_sync examples/low_level_sync.cpp)
target_compile_features(low_level_sync PUBLIC cxx_std_17)
target_compile_features(low_level_async PUBLIC cxx_std_20)
target_compile_features(test_conn_exec PUBLIC cxx_std_20)
target_compile_features(test_conn_push PUBLIC cxx_std_20)
target_compile_features(test_conn_quit PUBLIC cxx_std_17)
target_compile_features(test_conn_quit_coalesce PUBLIC cxx_std_17)
target_compile_features(test_conn_reconnect PUBLIC cxx_std_20)
target_compile_features(test_conn_tls PUBLIC cxx_std_17)
target_compile_features(test_low_level PUBLIC cxx_std_17)
target_compile_features(test_conn_run_cancel PUBLIC cxx_std_20)
target_compile_features(test_conn_exec_cancel PUBLIC cxx_std_20)
target_compile_features(test_conn_echo_stress PUBLIC cxx_std_20)
target_compile_features(test_request PUBLIC cxx_std_17)
target_link_libraries(test_conn_tls OpenSSL::Crypto OpenSSL::SSL)
# Tests
#=======================================================================
#add_test(intro_sync intro_sync)
add_test(low_level_sync low_level_sync)
add_test(low_level_async low_level_async)
add_test(test_low_level test_low_level)
if (MSVC)
target_compile_options(low_level_sync PRIVATE /bigobj)
target_compile_definitions(low_level_sync PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_exec tests/conn_exec.cpp)
target_compile_features(test_conn_exec PUBLIC cxx_std_20)
add_test(test_conn_exec test_conn_exec)
if (MSVC)
target_compile_options(test_conn_exec PRIVATE /bigobj)
target_compile_definitions(test_conn_exec PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_push tests/conn_push.cpp)
target_compile_features(test_conn_push PUBLIC cxx_std_20)
add_test(test_conn_push test_conn_push)
if (MSVC)
target_compile_options(test_conn_push PRIVATE /bigobj)
target_compile_definitions(test_conn_push PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_quit tests/conn_quit.cpp)
target_compile_features(test_conn_quit PUBLIC cxx_std_17)
add_test(test_conn_quit test_conn_quit)
if (MSVC)
target_compile_options(test_conn_quit PRIVATE /bigobj)
target_compile_definitions(test_conn_quit PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_quit_coalesce tests/conn_quit_coalesce.cpp)
add_test(test_conn_quit_coalesce test_conn_quit_coalesce)
target_compile_features(test_conn_quit_coalesce PUBLIC cxx_std_17)
if (MSVC)
target_compile_options(test_conn_quit_coalesce PRIVATE /bigobj)
target_compile_definitions(test_conn_quit_coalesce PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_reconnect tests/conn_reconnect.cpp)
target_compile_features(test_conn_reconnect PUBLIC cxx_std_20)
target_link_libraries(test_conn_reconnect common)
add_test(test_conn_reconnect test_conn_reconnect)
if (MSVC)
target_compile_options(test_conn_reconnect PRIVATE /bigobj)
target_compile_definitions(test_conn_reconnect PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_tls tests/conn_tls.cpp)
add_test(test_conn_tls test_conn_tls)
target_compile_features(test_conn_tls PUBLIC cxx_std_17)
target_link_libraries(test_conn_tls OpenSSL::Crypto OpenSSL::SSL)
if (MSVC)
target_compile_options(test_conn_tls PRIVATE /bigobj)
target_compile_definitions(test_conn_tls PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_low_level tests/low_level.cpp)
target_compile_features(test_low_level PUBLIC cxx_std_17)
add_test(test_low_level test_low_level)
if (MSVC)
target_compile_options(test_low_level PRIVATE /bigobj)
target_compile_definitions(test_low_level PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_run_cancel tests/conn_run_cancel.cpp)
target_compile_features(test_conn_run_cancel PUBLIC cxx_std_20)
add_test(test_conn_run_cancel test_conn_run_cancel)
if (MSVC)
target_compile_options(test_conn_run_cancel PRIVATE /bigobj)
target_compile_definitions(test_conn_run_cancel PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_exec_cancel tests/conn_exec_cancel.cpp)
target_compile_features(test_conn_exec_cancel PUBLIC cxx_std_20)
target_link_libraries(test_conn_exec_cancel common)
add_test(test_conn_exec_cancel test_conn_exec_cancel)
if (MSVC)
target_compile_options(test_conn_exec_cancel PRIVATE /bigobj)
target_compile_definitions(test_conn_exec_cancel PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_conn_echo_stress tests/conn_echo_stress.cpp)
target_compile_features(test_conn_echo_stress PUBLIC cxx_std_20)
target_link_libraries(test_conn_echo_stress common)
add_test(test_conn_echo_stress test_conn_echo_stress)
if (MSVC)
target_compile_options(test_conn_echo_stress PRIVATE /bigobj)
target_compile_definitions(test_conn_echo_stress PRIVATE _WIN32_WINNT=0x0601)
endif()
add_executable(test_request tests/request.cpp)
target_compile_features(test_request PUBLIC cxx_std_17)
add_test(test_request test_request)
if (MSVC)
target_compile_options(test_request PRIVATE /bigobj)
target_compile_definitions(test_request PRIVATE _WIN32_WINNT=0x0601)
endif()
# Install
#=======================================================================

View File

@@ -49,6 +49,7 @@
"CMAKE_BUILD_TYPE": "Debug",
"CMAKE_CXX_EXTENSIONS": "OFF",
"CMAKE_CXX_FLAGS": "-Wall -Wextra -fsanitize=address",
"CMAKE_CXX_COMPILER": "g++-11",
"CMAKE_SHARED_LINKER_FLAGS": "-fsanitize=address",
"CMAKE_CXX_STANDARD_REQUIRED": "ON",
"PROJECT_BINARY_DIR": "${sourceDir}/build/dev",

688
README.md
View File

@@ -1,117 +1,235 @@
# Documentation
[TOC]
# Aedis
## Overview
Aedis is a high-level [Redis](https://redis.io/) client library
built on top of
[Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html).
Some of its distinctive features are
Aedis is a [Redis](https://redis.io/) client library built on top of
[Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html)
that implements the latest version of the Redis communication
protocol
[RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md).
It makes communication with a Redis server easy by hiding most of
the low-level Asio-related code away from the user, which in the majority of
the cases will be concerned with only three library entities
* Support for the latest version of the Redis communication protocol [RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md).
* Support for STL containers, TLS and Redis sentinel.
* Serialization and deserialization of your own data types.
* Back pressure, cancellation and low latency.
* `aedis::connection`: A connection to the Redis server.
* `aedis::resp3::request`: A container of Redis commands.
* `aedis::adapt()`: Adapts data structures to receive responses.
In addition to that, Aedis hides most of the low-level Asio code away
from the user. For example, the coroutine below retrieves Redis hashes
in a `std::map` and quits the connection (see containers.cpp)
For example, the coroutine below uses a short-lived connection to read Redis
[hashes](https://redis.io/docs/data-types/hashes/)
in a `std::map` (see intro.cpp and containers.cpp)
```cpp
auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto async_main() -> net::awaitable<void>
{
// A request contains multiple Redis commands.
request req;
req.get_config().cancel_on_connection_lost = true;
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// From examples/common.hpp to avoid vebosity
co_await connect(conn, "127.0.0.1", "6379");
// A request contains multiple commands.
resp3::request req;
req.push("HELLO", 3);
req.push("HGETALL", "hset-key");
req.push("QUIT");
// Tuple elements will hold the response to each command in the request.
// Responses as tuple elements.
std::tuple<aedis::ignore, std::map<std::string, std::string>, aedis::ignore> resp;
// Executes the request and reads the response.
co_await conn->async_exec(req, adapt(resp));
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
// Uses the map ...
// Use the map from std::get<1>(resp) ...
}
```
The execution of calls to `connection::async_exec` like above are
triggered by the `connection::async_run` member function, which is
required to be running concurrently for as long as the connection
stands. For example, the code below uses a short-lived connection to
execute the coroutine above
The execution of `connection::async_exec` above is composed with
`connection::async_run` with the aid of the Asio awaitable operator ||
that ensures that one operation is cancelled as soon as the other
completes, these functions play the following roles
* `connection::async_exec`: Execute commands (i.e. write the request and reads the response).
* `connection::async_run`: Coordinate read and write operations and remains suspended until the connection is lost.
Let us dig in.
<a name="connection"></a>
## Connection
In general we will want to reuse the same connection for multiple
requests, we can do this with the example above by decoupling the
HELLO command and the call to `async_run` in a separate coroutine
```cpp
net::awaitable<void> async_main()
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
co_await connect(conn, "127.0.0.1", "6379");
resp3::request req;
req.push("HELLO", 3); // Upgrade to RESP3
// Notice we use && instead of || so async_run is not cancelled
// when the response to HELLO comes.
co_await (conn->async_run() && conn->async_exec(req));
}
```
We can now let `run` run detached in the background while other
coroutines perform requests on the connection
```cpp
auto async_main() -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// Resolves and connects (from examples/common.hpp to avoid vebosity)
co_await connect(conn, "127.0.0.1", "6379");
// Calls async_run detached.
net::co_spawn(ex, run(conn), net::detached)
// Runs and executes the request.
co_await (conn->async_run() || hgetall(conn));
// Here we can pass conn around to other coroutines so they can make requests.
...
}
```
Long-lived connections follow the same principle (see the examples
below) and will be discussed in more detail later. The role of the
`async_run` is to coordinate IO and ensure the connection is always
reading from the socket. The reationale behind this design is
With this separation, it is now easy to incorporate other operations
in our application, for example, to cancel the connection on `SIGINT`
and `SIGTERM` we can extend `run` as follows
* Provide quick reaction to disconnections and hence faster failovers.
* Support server pushes and requests in the same connection object,
concurrently.
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
co_await connect(conn, "127.0.0.1", "6379");
signal_set sig{ex, SIGINT, SIGTERM};
In the following sections we will discuss with more details the main
entities Aedis users are concerned with, namely
resp3::request req;
req.push("HELLO", 3);
* `aedis::resp3::request`: A container of Redis commands.
* `aedis::adapt()`: A function that adapts data structures to receive Redis responses.
* `aedis::connection`: A connection to the Redis server.
co_await ((conn->async_run() || sig.async_wait()) && conn->async_exec(req));
}
```
before that however, users might find it helpful to skim over the
examples, to gain a better feeling about the library capabilities
Likewise we can incorporate support for server pushes, healthy checks and pubsub
* intro.cpp: The Aedis hello-world program. It sends one command to Redis and quits the connection.
* intro_tls.cpp: Same as intro.cpp but over TLS.
* containers.cpp: Shows how to send and receive stl containers and how to use transactions.
* serialization.cpp: Shows how to serialize types using Boost.Json.
* subscriber.cpp: Shows how to implement pubsub that reconnects and resubscribes when the connection is lost.
* echo_server.cpp: A simple TCP echo server.
* chat_room.cpp: A command line chat room built on Redis pubsub.
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
co_await connect(conn, "127.0.0.1", "6379");
signal_set sig{ex, SIGINT, SIGTERM};
resp3::request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel1", "channel2");
co_await ((conn->async_run() || sig.async_wait() || receiver(conn) || healthy_checker(conn))
&& conn->async_exec(req));
}
```
The definition of `receiver` and `healthy_checker` above can be found
in subscriber.cpp. Adding a loop around `async_run` produces a simple
way to support reconnection _while there are pending operations on the connection_,
for example, to reconnect to the same address
```cpp
auto run(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
steady_timer timer{ex};
resp3::request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel1", "channel2");
for (;;) {
co_await connect(conn, "127.0.0.1", "6379");
co_await ((conn->async_run() || healthy_checker(conn) || receiver(conn))
&& conn->async_exec(req));
conn->reset_stream();
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();
}
}
```
For failover with sentinels see `resolve_with_sentinel.cpp`. At
this point the reasons for why `async_run` was introduced in Aedis
might have become apparent to the reader
* Provide quick reaction to disconnections and hence faster failover.
* Support server pushes and requests in the same connection object, concurrently.
* Separate requests, handling of server pushes and reconnection operations.
### Cancellation
Aedis supports both implicit and explicit cancellation of connection
operations. Explicit cancellation is supported by means of the
`aedis::connection::cancel` member function. Implicit cancellation,
like those that may happen when using Asio awaitable operators && and
|| will be discussed with more detail below.
```cpp
co_await (conn.async_run(...) && conn.async_exec(...))
```
* Provide a simple way to send HELLO and perform channel subscription.
```cpp
co_await (conn.async_run(...) || conn.async_exec(...))
```
* Useful for short-lived connections that are meant to be closed after
a command has been executed.
```cpp
co_await (conn.async_exec(...) || time.async_wait(...))
```
* Provides a way to limit how long the execution of a single request
should last.
* The cancellation will be ignored if the request has already
been written to the socket.
* It is usually a better idea to have a healthy checker than adding
per request timeout, see subscriber.cpp for an example.
```cpp
co_await (conn.async_run(...) || time.async_wait(...))
```
* Sets a limit on how long the connection should live.
```cpp
co_await (conn.async_exec(...) || conn.async_exec(...) || ... || conn.async_exec(...))
```
* This works but is unnecessary. Unless the user has set
`aedis::resp3::request::config::coalesce` to `false`, and he
shouldn't, the connection will automatically merge the individual
requests into a single payload anyway.
<a name="requests"></a>
### Requests
## Requests
Redis requests are composed of one or more Redis commands (in
Redis requests are composed of one or more commands (in the
Redis documentation they are called
[pipelines](https://redis.io/topics/pipelining)). For example
```cpp
// Some example containers.
std::list<std::string> list {...};
std::map<std::string, mystruct> map { ...};
request req;
// Command with variable length of arguments.
req.push("SET", "key", "some value", "EX", "2");
// Pushes a list.
std::list<std::string> list
{"channel1", "channel2", "channel3"};
req.push_range("SUBSCRIBE", list);
// Same as above but as an iterator range.
req.push_range("SUBSCRIBE", std::cbegin(list), std::cend(list));
// Pushes a map.
std::map<std::string, mystruct> map
{ {"key1", "value1"}
, {"key2", "value2"}
, {"key3", "value3"}};
req.push_range("HSET", "key", map);
```
@@ -119,11 +237,11 @@ Sending a request to Redis is performed with `aedis::connection::async_exec` as
<a name="serialization"></a>
#### Serialization
### Serialization
The `push` and `push_range` functions above work with integers
e.g. `int` and `std::string` out of the box. To send your own
data type define a `to_bulk` function like this
The `resp3::request::push` and `resp3::request::push_range` member functions work
with integer data types e.g. `int` and `std::string` out of the box.
To send your own data type define a `to_bulk` function like this
```cpp
// Example struct.
@@ -148,11 +266,17 @@ std::map<std::string, mystruct> map {...};
req.push_range("HSET", "key", map);
```
Example serialization.cpp shows how store json string in Redis.
Example serialization.cpp shows how store json strings in Redis.
<a name="responses"></a>
### Responses
### Config flags
The `aedis::resp3::request::config` object inside the request dictates how the
`aedis::connection` should handle the request in some important situations. The
reader is advised to read it carefully.
## Responses
Aedis uses the following strategy to support Redis responses
@@ -175,10 +299,11 @@ To read the response to this request users can use the following tuple
std::tuple<std::string, int, std::string>
```
The pattern may have become apparent to the user, the tuple must have
the same size as the request (exceptions below) and each element must
be able to store the response to the command it refers to. To ignore
responses to individual commands in the request use the tag
The pattern might have become apparent to the reader: the tuple must
have as many elements as the request has commands (exceptions below).
It is also necessary that each tuple element is capable of storing the
response to the command it refers to, otherwise an error will occur.
To ignore responses to individual commands in the request use the tag
`aedis::ignore`
```cpp
@@ -186,7 +311,8 @@ responses to individual commands in the request use the tag
std::tuple<std::string, aedis::ignore, std::string, aedis::ignore>
```
The following table provides the response types of some commands
The following table provides the resp3-types returned by some Redis
commands
Command | RESP3 type | Documentation
---------|-------------------------------------|--------------
@@ -249,7 +375,11 @@ If the intention is to ignore the response to all commands altogether
use `adapt()` without arguments instead
```cpp
// Uses the ignore adapter explicitly.
co_await conn->async_exec(req, adapt());
// Ignore adapter is also the default argument.
co_await conn->async_exec(req);
```
Responses that contain nested aggregates or heterogeneous data
@@ -258,7 +388,7 @@ of this writing, not all RESP3 types are used by the Redis server,
which means in practice users will be concerned with a reduced
subset of the RESP3 specification.
#### Push
### Pushes
Commands that have push response like
@@ -266,7 +396,7 @@ Commands that have push response like
* `"PSUBSCRIBE"`
* `"UNSUBSCRIBE"`
must be not be included in the tuple. For example, the request below
must be **NOT** be included in the tuple. For example, the request below
```cpp
request req;
@@ -278,7 +408,7 @@ req.push("QUIT");
must be read in this tuple `std::tuple<std::string, std::string>`,
that has size two.
#### Null
### Null
It is not uncommon for apps to access keys that do not exist or
that have already expired in the Redis server, to deal with these
@@ -290,19 +420,19 @@ std::tuple<
std::optional<A>,
std::optional<B>,
...
> response;
> resp;
co_await conn->async_exec(req, adapt(response));
co_await conn->async_exec(req, adapt(resp));
```
Everything else stays pretty much the same.
#### Transactions
### Transactions
To read responses to transactions we must first observe that Redis will
queue its commands and send their responses to the user as elements
of an array, after the `EXEC` command comes. For example, to read
the response to this request
queue the transaction commands and send their individual responses as elements
of an array, the array is itself the response to the `EXEC` command.
For example, to read the response to this request
```cpp
req.push("MULTI");
@@ -337,14 +467,14 @@ co_await conn->async_exec(req, adapt(resp));
For a complete example see containers.cpp.
#### Deserialization
### Deserialization
As mentioned in \ref serialization, it is common practice to
serialize data before sending it to Redis e.g. as json strings.
For performance and convenience reasons, we may also want to
deserialize it directly in its final data structure when reading them
back from Redis. Aedis supports this use case by calling a user
provided `from_bulk` function while parsing the response. For example
As mentioned in the serialization section, it is common practice to
serialize data before sending it to Redis e.g. as json strings. For
performance and convenience reasons, we may also want to deserialize
responses directly in their final data structure. Aedis supports this
use case by calling a user provided `from_bulk` function while parsing
the response. For example
```cpp
void from_bulk(mystruct& obj, char const* p, std::size_t size, boost::system::error_code& ec)
@@ -358,7 +488,7 @@ types e.g. `mystruct`, `std::map<std::string, mystruct>` etc.
<a name="the-general-case"></a>
#### The general case
### The general case
There are cases where responses to Redis
commands won't fit in the model presented above, some examples are
@@ -414,211 +544,115 @@ In addition to the above users can also use unordered versions of the
containers. The same reasoning also applies to sets e.g. `SMEMBERS`
and other data structures in general.
<a name="connection"></a>
### Connection
## Examples
The `aedis::connection` is a class that provides async-only
communication with a Redis server by means of three member
functions
These examples demonstrate what has been discussed so far.
* `connection::async_run`: Starts read and write operations and remains suspended until the connection it is lost.
* `connection::async_exec`: Executes commands.
* `connection::async_receive`: Receives server-side pushes.
* intro.cpp: The Aedis hello-world program. Sends one command and quits the connection.
* intro_tls.cpp: Same as intro.cpp but over TLS.
* intro_sync.cpp: Shows how to use the connection class synchronously.
* containers.cpp: Shows how to send and receive STL containers and how to use transactions.
* serialization.cpp: Shows how to serialize types using Boost.Json.
* resolve_with_sentinel.cpp: Shows how to resolve a master address using sentinels.
* subscriber.cpp: Shows how to implement pubsub with reconnection re-subscription.
* echo_server.cpp: A simple TCP echo server.
* chat_room.cpp: A command line chat built on Redis pubsub.
* low_level_sync.cpp: Sends a ping synchronously using the low-level API.
* low_level_async.cpp: Sends a ping asynchronously using the low-level API.
In general, these operations will be running concurrently in user
application, where, for example
To avoid repetition code that is common to all examples has been
grouped in common.hpp. The main function used in some async examples
has been factored out in the main.cpp file.
1. **Run**: One coroutine will call `async_run`, perhaps in a loop and
with healthy checks.
2. **Execute**: Multiple coroutines will call `async_exec` independently
and without coordination (e.g. queuing).
3. **Receive**: One coroutine will loop on `async_receive` to receive
server-side pushes (required only if the app expects server pushes).
## Echo server benchmark
Each of these operations can be performed without regards to the
others as they are independent from each other. Below we will cover
the points above with more detail.
This document benchmarks the performance of TCP echo servers I
implemented in different languages using different Redis clients. The
main motivations for choosing an echo server are
#### Run
* Simple to implement and does not require expertise level in most languages.
* I/O bound: Echo servers have very low CPU consumption in general
and therefore are excelent to measure how a program handles concurrent requests.
* It simulates very well a typical backend in regard to concurrency.
The code snipet in the overview section has shown us an example that
used `connection::async_run` in short-lived connection, in the general
case however, applications will connect to a Redis server and hang
around for as long as possible, until the connection is lost for some
reason. When that happens, simple setups will want to wait for a
short period of time and try to reconnect. To support this usage
pattern Aedis connections can be reconnected _while there are pending
requests and receive operations_. The general form of a reconnect
loop looks like this (see subscriber.cpp)
I also imposed some constraints on the implementations
```cpp
auto async_main() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
signal_set_type sig{ex, SIGINT, SIGTERM};
timer_type timer{ex};
* It should be simple enough and not require writing too much code.
* Favor the use standard idioms and avoid optimizations that require expert level.
* Avoid the use of complex things like connection and thread pool.
request req;
req.get_config().cancel_on_connection_lost = true;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
To reproduce these results run one of the echo-server programs in one
terminal and the
[echo-server-client](https://github.com/mzimbres/aedis/blob/42880e788bec6020dd018194075a211ad9f339e8/benchmarks/cpp/asio/echo_server_client.cpp)
in another.
// Loop to reconnect on connection lost. To exit type Ctrl-C twice.
for (;;) {
co_await connect(conn, "127.0.0.1", "6379");
### Without Redis
// Starts async_run and other operations.
co_await ((conn->async_run() || healthy_checker(conn) || sig.async_wait() ||
receiver(conn)) && conn->async_exec(req));
First I tested a pure TCP echo server, i.e. one that sends the messages
directly to the client without interacting with Redis. The result can
be seen below
// Prepare for a reconnect.
conn->reset_stream();
![](https://mzimbres.github.io/aedis/tcp-echo-direct.png)
// Waits some time before reconnecting.
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();
}
}
```
The tests were performed with a 1000 concurrent TCP connections on the
localhost where latency is 0.07ms on average on my machine. On higher
latency networks the difference among libraries is expected to
decrease.
It is important to emphasize that Redis servers use the old
communication protocol RESP2 by default, therefore it is necessary to
send a `HELLO 3` command everytime a connection is established.
Another common scenarios for reconnection is, for example, a failover
with sentinels, covered in `resolve_with_sentinel.cpp` example.
* I expected Libuv to have similar performance to Asio and Tokio.
* I did expect nodejs to come a little behind given it is is
javascript code. Otherwise I did expect it to have similar
performance to libuv since it is the framework behind it.
* Go did surprise me: faster than nodejs and libuv!
#### Execute
The code used in the benchmarks can be found at
The basic idea about `async_exec` was stated above already: execute
Redis commands. One of the most important things about it however is
that it can be called multiple times without coordination, for
example, in a HTTP or Websocket server where each session calls it
independently to communicate with Redis. The benefits of this feature
are manifold
* [Asio](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/cpp/asio/echo_server_direct.cpp): A variation of [this](https://github.com/chriskohlhoff/asio/blob/4915cfd8a1653c157a1480162ae5601318553eb8/asio/src/examples/cpp20/coroutines/echo_server.cpp) Asio example.
* [Libuv](https://github.com/mzimbres/aedis/tree/835a1decf477b09317f391eddd0727213cdbe12b/benchmarks/c/libuv): Taken from [here](https://github.com/libuv/libuv/blob/06948c6ee502862524f233af4e2c3e4ca876f5f6/docs/code/tcp-echo-server/main.c) Libuv example .
* [Tokio](https://github.com/mzimbres/aedis/tree/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/rust/echo_server_direct): Taken from [here](https://docs.rs/tokio/latest/tokio/).
* [Nodejs](https://github.com/mzimbres/aedis/tree/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/nodejs/echo_server_direct)
* [Go](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/go/echo_server_direct.go)
* Reduces code complexity as users won't have to implement queues
every time e.g. different HTTP sessions want to share a connection to Redis.
* A small number of connections improves the performance associated
with [pipelines](https://redis.io/topics/pipelining). A single
connection will be indeed enough in most of cases.
### With Redis
There are some important things about `connection::async_exec` that
are worth stating here
This is similar to the echo server described above but messages are
echoed by Redis and not by the echo-server itself, which acts
as a proxy between the client and the Redis server. The results
can be seen below
* `connection::async_exec` will write a request and read the response
directly in the data structure passed by the user, avoiding
temporaries altogether.
* Requests belonging to different `async_exec` will be coalesced in a single payload
(pipelined) and written only once, improving performance massively.
* Users have full control whether `async_exec` should remain suspended
if a connection is lost, (among other things). See
`aedis::resp3::request::config`.
![](https://mzimbres.github.io/aedis/tcp-echo-over-redis.png)
The code below illustrates this concepts in a TCP session of the
`echo_server.cpp` example
The tests were performed on a network where latency is 35ms on
average, otherwise it uses the same number of TCP connections
as the previous example.
```cpp
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> db) -> net::awaitable<void>
{
request req;
std::tuple<std::string> response;
As the reader can see, the Libuv and the Rust test are not depicted
in the graph, the reasons are
for (std::string buffer;;) {
// Reads a user message.
auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n");
* [redis-rs](https://github.com/redis-rs/redis-rs): This client
comes so far behind that it can't even be represented together
with the other benchmarks without making them look insignificant.
I don't know for sure why it is so slow, I suppose it has
something to do with its lack of automatic
[pipelining](https://redis.io/docs/manual/pipelining/) support.
In fact, the more TCP connections I lauch the worse its
performance gets.
// Echos it through Redis.
req.push("PING", buffer);
co_await conn->async_exec(req, adapt(response));
* Libuv: I left it out because it would require me writing to much
c code. More specifically, I would have to use hiredis and
implement support for pipelines manually.
// Writes is back to the user.
co_await net::async_write(socket, net::buffer(std::get<0>(response)));
The code used in the benchmarks can be found at
// Cleanup
std::get<0>(response).clear();
req.clear();
buffer.erase(0, n);
}
}
```
* [Aedis](https://github.com/mzimbres/aedis): [code](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/examples/echo_server.cpp)
* [node-redis](https://github.com/redis/node-redis): [code](https://github.com/mzimbres/aedis/tree/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/nodejs/echo_server_over_redis)
* [go-redis](https://github.com/go-redis/redis): [code](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/go/echo_server_over_redis.go)
Notice also how the session above provides back-pressure as the
coroutine won't read the next message from the socket until a cycle is
complete.
### Conclusion
#### Receive
Redis clients have to support automatic pipelining to have competitive performance. For updates to this document follow https://github.com/mzimbres/aedis.
Receiving Redis pushes works similar to the `async_exec` discussed
above but without the request. The example below was taken from
subscriber.cpp
```cpp
net::awaitable<void> push_receiver(std::shared_ptr<connection> conn)
{
for (std::vector<node<std::string>> resp;;) {
co_await conn->async_receive(adapt(resp));
print_push(resp);
resp.clear();
}
}
```
In general, it is advisable to all apps to keep a coroutine calling
`async_receive` as an unread push will cause the connection to stall
and eventually timeout. Notice that the same connection that is being
used to send requests can be also used to receive server-side pushes.
#### Cancellation
Aedis supports both implicit and explicit cancellation of connection
operations. Explicit cancellation is supported by means of the
`aedis::connection::cancel` member function. Implicit cancellation,
like those that may happen when using Asio awaitable operators && and
|| will be discussed with more detail below.
```cpp
co_await (conn.async_run(...) && conn.async_exec(...))
```
* Useful when implementing reconnection.
* `async_exec` is responsible for sending the `HELLO` command and
optionally for subscribing to channels.
```cpp
co_await (conn.async_run(...) || conn.async_exec(...))
```
* Useful for short-lived connections that are meant to be closed after
a command has been executed.
```cpp
co_await (conn.async_exec(...) || time.async_wait(...))
```
* Provides a way to limit how long the execution of a single request
should last.
* The cancellation will be ignored if the request has already
been written to the socket.
* It is usually a better idea to have a healthy checker that adding
per request timeout, see subscriber.cpp for an example.
```cpp
co_await (conn.async_run(...) || time.async_wait(...))
```
* Sets a limit on how long the connection should live.
```cpp
co_await (conn.async_exec(...) || conn.async_exec(...) || ... || conn.async_exec(...))
```
* This works but is considered an antipattern. Unless
the user has set `aedis::resp3::request::config::coalesce` to
`false`, and he shouldn't, the connection will automatically merge
the individual requests into a single payload anyway.
## Why Aedis
## Comparison
The main reason for why I started writing Aedis was to have a client
compatible with the Asio asynchronous model. As I made progresses I could
@@ -631,6 +665,8 @@ stars, namely
* https://github.com/sewenew/redis-plus-plus
### Aedis vs Redis-plus-plus
Before we start it is important to mentioning some of the things
redis-plus-plus does not support
@@ -727,96 +763,7 @@ enqueueing a message and triggering a write when it can be sent.
It is also not clear how are pipelines realised with this design
(if at all).
### Echo server benchmark
This document benchmarks the performance of TCP echo servers I
implemented in different languages using different Redis clients. The
main motivations for choosing an echo server are
* Simple to implement and does not require expertise level in most languages.
* I/O bound: Echo servers have very low CPU consumption in general
and therefore are excelent to measure how a program handles concurrent requests.
* It simulates very well a typical backend in regard to concurrency.
I also imposed some constraints on the implementations
* It should be simple enough and not require writing too much code.
* Favor the use standard idioms and avoid optimizations that require expert level.
* Avoid the use of complex things like connection and thread pool.
To reproduce these results run one of the echo-server programs in one
terminal and the
[echo-server-client](https://github.com/mzimbres/aedis/blob/42880e788bec6020dd018194075a211ad9f339e8/benchmarks/cpp/asio/echo_server_client.cpp)
in another.
#### Without Redis
First I tested a pure TCP echo server, i.e. one that sends the messages
directly to the client without interacting with Redis. The result can
be seen below
![](https://mzimbres.github.io/aedis/tcp-echo-direct.png)
The tests were performed with a 1000 concurrent TCP connections on the
localhost where latency is 0.07ms on average on my machine. On higher
latency networks the difference among libraries is expected to
decrease.
* I expected Libuv to have similar performance to Asio and Tokio.
* I did expect nodejs to come a little behind given it is is
javascript code. Otherwise I did expect it to have similar
performance to libuv since it is the framework behind it.
* Go did surprise me: faster than nodejs and libuv!
The code used in the benchmarks can be found at
* [Asio](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/cpp/asio/echo_server_direct.cpp): A variation of [this](https://github.com/chriskohlhoff/asio/blob/4915cfd8a1653c157a1480162ae5601318553eb8/asio/src/examples/cpp20/coroutines/echo_server.cpp) Asio example.
* [Libuv](https://github.com/mzimbres/aedis/tree/835a1decf477b09317f391eddd0727213cdbe12b/benchmarks/c/libuv): Taken from [here](https://github.com/libuv/libuv/blob/06948c6ee502862524f233af4e2c3e4ca876f5f6/docs/code/tcp-echo-server/main.c) Libuv example .
* [Tokio](https://github.com/mzimbres/aedis/tree/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/rust/echo_server_direct): Taken from [here](https://docs.rs/tokio/latest/tokio/).
* [Nodejs](https://github.com/mzimbres/aedis/tree/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/nodejs/echo_server_direct)
* [Go](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/go/echo_server_direct.go)
#### With Redis
This is similar to the echo server described above but messages are
echoed by Redis and not by the echo-server itself, which acts
as a proxy between the client and the Redis server. The results
can be seen below
![](https://mzimbres.github.io/aedis/tcp-echo-over-redis.png)
The tests were performed on a network where latency is 35ms on
average, otherwise it uses the same number of TCP connections
as the previous example.
As the reader can see, the Libuv and the Rust test are not depicted
in the graph, the reasons are
* [redis-rs](https://github.com/redis-rs/redis-rs): This client
comes so far behind that it can't even be represented together
with the other benchmarks without making them look insignificant.
I don't know for sure why it is so slow, I suppose it has
something to do with its lack of automatic
[pipelining](https://redis.io/docs/manual/pipelining/) support.
In fact, the more TCP connections I lauch the worse its
performance gets.
* Libuv: I left it out because it would require me writing to much
c code. More specifically, I would have to use hiredis and
implement support for pipelines manually.
The code used in the benchmarks can be found at
* [Aedis](https://github.com/mzimbres/aedis): [code](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/examples/echo_server.cpp)
* [node-redis](https://github.com/redis/node-redis): [code](https://github.com/mzimbres/aedis/tree/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/nodejs/echo_server_over_redis)
* [go-redis](https://github.com/go-redis/redis): [code](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/go/echo_server_over_redis.go)
<a name="api-reference"></a>
#### Conclusion
Redis clients have to support automatic pipelining to have competitive performance. For updates to this document follow https://github.com/mzimbres/aedis.
## Reference
* [High-Level](#high-level-api): Covers the topics discussed in this document.
@@ -831,31 +778,31 @@ library, so you can starting using it right away by adding the
```cpp
#include <aedis/src.hpp>
```
in no more than one source file in your applications. For example, to
compile one of the examples manually
in no more than one source file in your applications. To build the
examples and test cmake is supported, for example
```cpp
g++ -std=c++20 -pthread -I/opt/boost_1_79_0/include/ -Iinclude -Iexamples examples/intro.cpp examples/common.cpp
BOOST_ROOT=/opt/boost_1_80_0 cmake --preset dev
```
The requirements for using Aedis are
- Boost 1.79 or greater.
- Boost 1.80 or greater.
- C++17 minimum.
- Redis 6 or higher (must support RESP3).
- Optionally also redis-cli and Redis Sentinel.
The following compilers are supported
- Tested with gcc: 10, 11, 12.
- Tested with clang: 11, 13, 14.
- Gcc: 10, 11, 12.
- Clang: 11, 13, 14.
- Visual Studio 17 2022, Visual Studio 16 2019.
## Acknowledgement
Acknowledgement to people that helped shape Aedis in one way or
another.
Acknowledgement to people that helped shape Aedis
* Richard Hodges ([madmongo1](https://github.com/madmongo1)): For very helpful support with Asio, the design of asynchronous programs, etc.
* Vinícius dos Santos Oliveira ([vinipsmaker](https://github.com/vinipsmaker)): For useful discussion about how Aedis consumes buffers in the read operation.
@@ -865,11 +812,18 @@ another.
## Changelog
### v1.3.0
### v1.4.0
* Removes dependency on Boost.Hana, boost::string_view, Boost.Variant2 and Boost.Spirit.
* Fixes build and setup CI on windows.
### v1.3.0-1
* Upgrades to Boost 1.80.0
* Removes automatic sending of the `HELLO` command. This can't be
implemented properly without bloating the connection class. It is
now a user responsability to send HELLO. Requests that contain it have
now a user responsibility to send HELLO. Requests that contain it have
priority over other requests and will be moved to the front of the
queue, see `aedis::resp3::request::config`
@@ -982,7 +936,7 @@ another.
is possible in simple reconnection strategies but bloats the class
in more complex scenarios, for example, with sentinel,
authentication and TLS. This is trivial to implement in a separate
coroutine. As a result the enum `event` and `async_receive_event`
coroutine. As a result the `enum event` and `async_receive_event`
have been removed from the class too.
* Fixes a bug in `connection::async_receive_push` that prevented

View File

@@ -14,8 +14,7 @@ using net::ip::tcp;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using timer_type = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
net::awaitable<void>
example(boost::asio::ip::tcp::endpoint ep, std::string msg, int n)
auto example(boost::asio::ip::tcp::endpoint ep, std::string msg, int n) -> net::awaitable<void>
{
try {
auto ex = co_await net::this_coro::executor;

View File

@@ -33,7 +33,7 @@ awaitable_type echo(tcp_socket socket)
std::size_t n = co_await socket.async_read_some(net::buffer(data), use_awaitable);
co_await async_write(socket, net::buffer(data, n), use_awaitable);
}
} catch (std::exception const& e) {
} catch (std::exception const&) {
//std::printf("echo Exception: %s\n", e.what());
}
}

View File

@@ -1250,7 +1250,7 @@ HTML_FILE_EXTENSION = .html
# of the possible markers and block names see the documentation.
# This tag requires that the tag GENERATE_HTML is set to YES.
HTML_HEADER = doc/htmlheader.html
HTML_HEADER =
# The HTML_FOOTER tag can be used to specify a user-defined HTML footer for each
# generated HTML page. If the tag is left blank doxygen will generate a standard
@@ -1260,7 +1260,7 @@ HTML_HEADER = doc/htmlheader.html
# that doxygen normally uses.
# This tag requires that the tag GENERATE_HTML is set to YES.
HTML_FOOTER = doc/htmlfooter.html
HTML_FOOTER =
# The HTML_STYLESHEET tag can be used to specify a user-defined cascading style
# sheet that is used by each HTML page. It can be used to fine-tune the look of
@@ -1595,7 +1595,7 @@ DISABLE_INDEX = YES
# The default value is: NO.
# This tag requires that the tag GENERATE_HTML is set to YES.
GENERATE_TREEVIEW = NO
GENERATE_TREEVIEW = YES
# When both GENERATE_TREEVIEW and DISABLE_INDEX are set to YES, then the
# FULL_SIDEBAR option determines if the side bar is limited to only the treeview

View File

@@ -3,7 +3,7 @@
<!-- Navigation index tabs for HTML output -->
<navindex>
<tab type="mainpage" visible="yes" title="Contents"/>
<tab type="pages" visible="no" title="" intro=""/>
<tab type="pages" visible="yes" title="" intro=""/>
<tab type="modules" visible="no" title="Reference" intro=""/>
<tab type="namespaces" visible="no" title="">
<tab type="namespacelist" visible="yes" title="" intro=""/>

File diff suppressed because it is too large Load Diff

View File

@@ -1,19 +0,0 @@
<!-- HTML footer for doxygen 1.8.14-->
<!-- start footer part -->
<!--BEGIN GENERATE_TREEVIEW-->
<div id="nav-path" class="navpath"><!-- id is needed for treeview function! -->
<ul>
$navpath
<li class="footer">
Aedis 1.0.0 - Reference Guide generated on $datetime using Doxygen $doxygenversion &#160;&#160;
<img class="footer" src="rootlogo_s.gif" alt="root"/></li>
</ul>
</div>
<!--END GENERATE_TREEVIEW-->
<!--BEGIN !GENERATE_TREEVIEW-->
<hr class="footer"/><address class="footer">
Author: Marcelo Zimbres Silva.
</address>
<!--END !GENERATE_TREEVIEW-->
</body>
</html>

View File

@@ -1,34 +0,0 @@
<!-- HTML header for doxygen 1.8.14-->
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=9"/>
<meta name="generator" content="Doxygen $doxygenversion"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<!--BEGIN PROJECT_NAME--><title>$projectname: $title</title><!--END PROJECT_NAME-->
<!--BEGIN !PROJECT_NAME--><title>$title</title><!--END !PROJECT_NAME-->
<link href="$relpath^tabs.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="$relpath^jquery.js"></script>
<script type="text/javascript" src="$relpath^dynsections.js"></script>
$search
<link href="$relpath^$stylesheet" rel="stylesheet" type="text/css" />
$extrastylesheet
</head>
<body>
<div id="top"><!-- do not remove this div, it is closed by doxygen! -->
<!--BEGIN TITLEAREA-->
<div id="titlearea">
<table bgcolor="#346295" cellspacing="0" cellpadding="6">
<tbody>
<tr>
<td valign="middle" style="color: #FFFFFF" nowrap="nowrap"><font size="6">$projectname $projectnumber</font> &#160; <br> $projectbrief </td>
<td style="width:100%"> $searchbox </td>
</tr>
</tbody>
</table>
</div>
<!--END TITLEAREA-->
<!-- end header part -->

View File

@@ -5,28 +5,29 @@
*/
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT) && defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <iostream>
namespace net = boost::asio;
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <aedis.hpp>
#include <unistd.h>
#include "common.hpp"
#include "common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using stream_descriptor = net::use_awaitable_t<>::as_default_on_t<net::posix::stream_descriptor>;
using signal_set_type = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using signal_set = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using aedis::adapt;
using aedis::resp3::request;
using aedis::resp3::node;
// Chat over Redis pubsub. To test, run this program from different
// Chat over Redis pubsub. To test, run this program from multiple
// terminals and type messages to stdin.
// Receives Redis server-side pushes.
// Receives Redis pushes.
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
for (std::vector<node<std::string>> resp;;) {
for (std::vector<resp3::node<std::string>> resp;;) {
co_await conn->async_receive(adapt(resp));
std::cout << resp.at(1).value << " " << resp.at(2).value << " " << resp.at(3).value << std::endl;
resp.clear();
@@ -38,33 +39,35 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
{
for (std::string msg;;) {
auto n = co_await net::async_read_until(*in, net::dynamic_buffer(msg, 1024), "\n");
request req;
resp3::request req;
req.push("PUBLISH", "chat-channel", msg);
co_await conn->async_exec(req);
msg.erase(0, n);
}
}
auto subscriber(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
request req;
req.get_config().cancel_on_connection_lost = true;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "chat-channel");
co_await conn->async_exec(req);
}
// Called from the main function (see main.cpp)
auto async_main() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto stream = std::make_shared<stream_descriptor>(ex, ::dup(STDIN_FILENO));
signal_set_type sig{ex, SIGINT, SIGTERM};
signal_set sig{ex, SIGINT, SIGTERM};
resp3::request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "chat-channel");
co_await connect(conn, "127.0.0.1", "6379");
co_await ((conn->async_run() || publisher(stream, conn) || receiver(conn) ||
healthy_checker(conn) || sig.async_wait()) && subscriber(conn));
healthy_checker(conn) || sig.async_wait()) && conn->async_exec(req));
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT) && defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
#else // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
auto async_main() -> net::awaitable<void>
{
std::cout << "Requires support for posix streams." << std::endl;
co_return;
}
#endif // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -0,0 +1,8 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <aedis.hpp>
#include <aedis/src.hpp>

View File

@@ -6,6 +6,7 @@
#include "common.hpp"
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
@@ -18,9 +19,6 @@ using aedis::resp3::request;
using aedis::adapt;
using aedis::operation;
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace
{
auto redir(boost::system::error_code& ec)
@@ -31,7 +29,6 @@ auto healthy_checker(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
try {
request req;
req.get_config().cancel_on_connection_lost = true;
req.push("PING");
timer_type timer{co_await net::this_coro::executor};
@@ -74,21 +71,23 @@ connect(
throw std::runtime_error("Connect timeout");
}
extern net::awaitable<void> async_main();
// Main function used in our examples.
auto main() -> int
auto run(net::awaitable<void> op) -> int
{
try {
net::io_context ioc;
net::co_spawn(ioc, async_main(), net::detached);
net::co_spawn(ioc, std::move(op), [](std::exception_ptr p) {
if (p)
std::rethrow_exception(p);
});
ioc.run();
return 0;
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 1;
}
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)
auto main() -> int {std::cout << "Requires coroutine support." << std::endl; return 0;}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -28,5 +28,7 @@ connect(
auto healthy_checker(std::shared_ptr<connection> conn) -> boost::asio::awaitable<void>;
auto run(boost::asio::awaitable<void> op) -> int;
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)
#endif // AEDIS_EXAMPLES_COMMON_HPP

30
examples/common/main.cpp Normal file
View File

@@ -0,0 +1,30 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include "common.hpp"
extern boost::asio::awaitable<void> async_main();
auto main() -> int
{
return run(async_main());
}
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <iostream>
auto main() -> int
{
std::cout << "Requires coroutine support." << std::endl;
return 0;
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -11,12 +11,12 @@
#include <map>
#include <vector>
#include "common.hpp"
#include "common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using aedis::adapt;
using aedis::resp3::request;
void print(std::map<std::string, std::string> const& cont)
{
@@ -31,45 +31,58 @@ void print(std::vector<int> const& cont)
}
// Stores the content of some STL containers in Redis.
auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto store() -> net::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// Resolves and connects (from examples/common.hpp to avoid vebosity)
co_await connect(conn, "127.0.0.1", "6379");
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
std::map<std::string, std::string> map
{{"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}};
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("HELLO", 3);
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
req.push("QUIT");
co_await conn->async_exec(req);
co_await (conn->async_run() || conn->async_exec(req));
}
// Retrieves a Redis hash as an std::map.
auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto hgetall() -> net::awaitable<std::map<std::string, std::string>>
{
request req;
req.get_config().cancel_on_connection_lost = true;
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// From examples/common.hpp to avoid vebosity
co_await connect(conn, "127.0.0.1", "6379");
// A request contains multiple commands.
resp3::request req;
req.push("HELLO", 3);
req.push("HGETALL", "hset-key");
req.push("QUIT");
// Responses as tuple elements.
std::tuple<aedis::ignore, std::map<std::string, std::string>, aedis::ignore> resp;
co_await conn->async_exec(req, adapt(resp));
print(std::get<1>(resp));
// Executes the request and reads the response.
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
co_return std::get<1>(resp);
}
// Retrieves in a transaction.
auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto transaction() -> net::awaitable<void>
{
request req;
req.get_config().cancel_on_connection_lost = true;
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// Resolves and connects (from examples/common.hpp to avoid vebosity)
co_await connect(conn, "127.0.0.1", "6379");
resp3::request req;
req.push("HELLO", 3);
req.push("MULTI");
req.push("LRANGE", "rpush-key", 0, -1); // Retrieves
@@ -86,26 +99,19 @@ auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
aedis::ignore // quit
> resp;
co_await conn->async_exec(req, adapt(resp));
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));
print(std::get<0>(std::get<4>(resp)).value());
print(std::get<1>(std::get<4>(resp)).value());
}
// Called from the main function (see main.cpp)
net::awaitable<void> async_main()
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
// Uses short-lived connections to store and retrieve the
// containers.
co_await connect(conn, "127.0.0.1", "6379");
co_await (conn->async_run() || store(conn));
co_await connect(conn, "127.0.0.1", "6379");
co_await (conn->async_run() || hgetall(conn));
co_await connect(conn, "127.0.0.1", "6379");
co_await (conn->async_run() || transaction(conn));
co_await store();
co_await transaction();
auto const map = co_await hgetall();
print(map);
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -8,19 +8,19 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <aedis.hpp>
#include "common.hpp"
#include "common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using signal_set_type = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using signal_set = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using aedis::adapt;
using aedis::resp3::request;
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> net::awaitable<void>
{
request req;
resp3::request req;
std::string resp;
for (std::string buffer;;) {
@@ -35,6 +35,7 @@ auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) ->
}
}
// Listens for tcp connections.
auto listener(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
@@ -43,14 +44,14 @@ auto listener(std::shared_ptr<connection> conn) -> net::awaitable<void>
net::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), net::detached);
}
// Called from the main function (see main.cpp)
auto async_main() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
signal_set_type sig{ex, SIGINT, SIGTERM};
signal_set sig{ex, SIGINT, SIGTERM};
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("HELLO", 3);
co_await connect(conn, "127.0.0.1", "6379");

View File

@@ -8,17 +8,17 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <aedis.hpp>
#include "common.hpp"
#include "common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using aedis::adapt;
using aedis::resp3::request;
net::awaitable<void> async_main()
// Called from the main function (see main.cpp)
auto async_main() -> net::awaitable<void>
{
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("HELLO", 3);
req.push("PING", "Hello world");
req.push("QUIT");

View File

@@ -7,26 +7,25 @@
#include <tuple>
#include <string>
#include <thread>
#include <iostream>
#include <boost/asio.hpp>
#include <aedis.hpp>
// TODO: Fix this after updating to 1.80.
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::adapt;
using aedis::resp3::request;
using connection = aedis::connection<>;
using connection = aedis::connection;
template <class Adapter>
auto exec(connection& conn, request const& req, Adapter adapter, boost::system::error_code& ec)
auto exec(std::shared_ptr<connection> conn, resp3::request const& req, Adapter adapter)
{
net::dispatch(
conn.get_executor(),
net::deferred([&]() { return conn.async_exec(req, adapter, net::deferred); }))
(net::redirect_error(net::use_future, ec)).get();
conn->get_executor(),
net::deferred([&]() { return conn->async_exec(req, adapter, net::deferred); }))
(net::use_future).get();
}
auto logger = [](auto const& ec)
@@ -37,25 +36,24 @@ int main()
try {
net::io_context ioc{1};
connection conn{ioc};
std::thread t{[&]() {
conn.async_run(logger);
auto conn = std::make_shared<connection>(ioc);
net::ip::tcp::resolver resv{ioc};
auto const res = resv.resolve("127.0.0.1", "6379");
net::connect(conn->next_layer(), res);
std::thread t{[conn, &ioc]() {
conn->async_run(logger);
ioc.run();
}};
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("HELLO", 3);
req.push("PING");
req.push("QUIT");
boost::system::error_code ec;
std::tuple<std::string, aedis::ignore> resp;
exec(conn, req, adapt(resp), ec);
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
exec(conn, req, adapt(resp));
std::cout
<< "Exec: " << ec.message() << "\n"
<< "Response: " << std::get<0>(resp) << std::endl;
std::cout << "Response: " << std::get<1>(resp) << std::endl;
t.join();
} catch (std::exception const& e) {

View File

@@ -17,10 +17,10 @@
#include <aedis/ssl/connection.hpp>
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using resolver = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::resolver>;
using aedis::adapt;
using aedis::resp3::request;
using connection = net::use_awaitable_t<>::as_default_on_t<aedis::ssl::connection>;
auto verify_certificate(bool, net::ssl::verify_context&) -> bool
@@ -31,8 +31,7 @@ auto verify_certificate(bool, net::ssl::verify_context&) -> bool
net::awaitable<void> async_main()
{
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("HELLO", 3, "AUTH", "aedis", "aedis");
req.push("PING");
req.push("QUIT");

View File

@@ -4,32 +4,32 @@
* accompanying file LICENSE.txt)
*/
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <aedis.hpp>
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <aedis.hpp>
#include <aedis/src.hpp>
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using endpoints = net::ip::tcp::resolver::results_type;
using resolver = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::resolver>;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using aedis::resp3::request;
using aedis::adapter::adapt2;
using net::ip::tcp;
net::awaitable<void> ping(endpoints const& addrs)
auto async_main() -> net::awaitable<void>
{
tcp_socket socket{co_await net::this_coro::executor};
net::connect(socket, addrs);
auto ex = co_await net::this_coro::executor;
resolver resv{ex};
auto const addrs = co_await resv.async_resolve("127.0.0.1", "6379");
tcp_socket socket{ex};
co_await net::async_connect(socket, addrs);
// Creates the request and writes to the socket.
request req;
resp3::request req;
req.push("HELLO", 3);
req.push("PING");
req.push("PING", "Hello world");
req.push("QUIT");
co_await resp3::async_write(socket, req);
@@ -45,19 +45,4 @@ net::awaitable<void> ping(endpoints const& addrs)
std::cout << "Ping: " << resp << std::endl;
}
int main()
{
try {
net::io_context ioc;
net::ip::tcp::resolver resv{ioc};
auto const addrs = resv.resolve("127.0.0.1", "6379");
net::co_spawn(ioc, ping(addrs), net::detached);
ioc.run();
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
}
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)
auto main() -> int {std::cout << "Requires coroutine support." << std::endl; return 0;}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -14,24 +14,21 @@
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::resp3::request;
using aedis::adapter::adapt2;
using net::ip::tcp;
int main()
{
try {
net::io_context ioc;
tcp::resolver resv{ioc};
net::ip::tcp::resolver resv{ioc};
auto const res = resv.resolve("127.0.0.1", "6379");
tcp::socket socket{ioc};
net::ip::tcp::socket socket{ioc};
net::connect(socket, res);
// Creates the request and writes to the socket.
request req;
resp3::request req;
req.push("HELLO", 3);
req.push("PING");
req.push("PING", "Hello world");
req.push("QUIT");
resp3::write(socket, req);

View File

View File

@@ -8,18 +8,19 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <aedis.hpp>
#include "common.hpp"
#include "common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using endpoints = net::ip::tcp::resolver::results_type;
using aedis::adapt;
using aedis::resp3::request;
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }
struct endpoint {
struct address {
std::string host;
std::string port;
};
@@ -27,10 +28,9 @@ struct endpoint {
// For more info see
// - https://redis.io/docs/manual/sentinel.
// - https://redis.io/docs/reference/sentinel-clients.
auto resolve_master_address(std::vector<endpoint> const& endpoints) -> net::awaitable<endpoint>
auto resolve_master_address(std::vector<address> const& endpoints) -> net::awaitable<address>
{
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("SENTINEL", "get-master-addr-by-name", "mymaster");
req.push("QUIT");
@@ -43,17 +43,17 @@ auto resolve_master_address(std::vector<endpoint> const& endpoints) -> net::awai
co_await (conn->async_run() && conn->async_exec(req, adapt(addr), redir(ec)));
conn->reset_stream();
if (std::get<0>(addr))
co_return endpoint{std::get<0>(addr).value().at(0), std::get<0>(addr).value().at(1)};
co_return address{std::get<0>(addr).value().at(0), std::get<0>(addr).value().at(1)};
}
co_return endpoint{};
co_return address{};
}
auto async_main() -> net::awaitable<void>
{
// A list of sentinel addresses from which only one is responsive
// to simulate sentinels that are down.
std::vector<endpoint> const endpoints
std::vector<address> const endpoints
{ {"foo", "26379"}
, {"bar", "26379"}
, {"127.0.0.1", "26379"}

View File

@@ -7,31 +7,48 @@
#include <boost/asio.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#include <boost/asio/experimental/awaitable_operators.hpp>
#define BOOST_JSON_NO_LIB
#define BOOST_CONTAINER_NO_LIB
#include <boost/json.hpp>
#include <aedis.hpp>
#include "common.hpp"
#include <algorithm>
#include <cstdint>
#include <iostream>
#include <set>
#include <iterator>
#include <string>
#include "common/common.hpp"
// Include this in no more than one .cpp file.
#include <boost/json/src.hpp>
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using aedis::resp3::request;
using aedis::adapt;
using namespace boost::json;
using aedis::adapt;
struct user {
std::string name;
std::string age;
std::string country;
friend auto operator<(user const& a, user const& b)
{
return std::tie(a.name, a.age, a.country) < std::tie(b.name, b.age, b.country);
}
friend auto operator<<(std::ostream& os, user const& u) -> std::ostream&
{
os << "Name: " << u.name << "\n"
<< "Age: " << u.age << "\n"
<< "Country: " << u.country;
return os;
}
};
// Boost.Json serialization.
void tag_invoke(value_from_tag, value& jv, user const& u)
{
jv =
@@ -42,7 +59,7 @@ void tag_invoke(value_from_tag, value& jv, user const& u)
}
template<class T>
void extract(object const& obj, T& t, boost::string_view key)
void extract(object const& obj, T& t, std::string_view key)
{
t = value_to<T>(obj.at(key));
}
@@ -57,40 +74,24 @@ auto tag_invoke(value_to_tag<user>, value const& jv)
return u;
}
// Serializes
// Aedis serialization
void to_bulk(std::pmr::string& to, user const& u)
{
aedis::resp3::to_bulk(to, serialize(value_from(u)));
}
// Deserializes
void from_bulk(user& u, boost::string_view sv, boost::system::error_code&)
void from_bulk(user& u, std::string_view sv, boost::system::error_code&)
{
value jv = parse(sv);
u = value_to<user>(jv);
}
auto operator<<(std::ostream& os, user const& u) -> std::ostream&
{
os << "Name: " << u.name << "\n"
<< "Age: " << u.age << "\n"
<< "Country: " << u.country;
return os;
}
auto operator<(user const& a, user const& b)
{
return std::tie(a.name, a.age, a.country) < std::tie(b.name, b.age, b.country);
}
net::awaitable<void> async_main()
{
std::set<user> users
{{"Joao", "58", "Brazil"} , {"Serge", "60", "France"}};
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("HELLO", 3);
req.push_range("SADD", "sadd-key", users); // Sends
req.push("SMEMBERS", "sadd-key"); // Retrieves
@@ -99,6 +100,7 @@ net::awaitable<void> async_main()
std::tuple<aedis::ignore, int, std::set<user>, std::string> resp;
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
co_await connect(conn, "127.0.0.1", "6379");
co_await (conn->async_run() || conn->async_exec(req, adapt(resp)));

View File

@@ -9,16 +9,14 @@
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <aedis.hpp>
#include "common.hpp"
#include "common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using namespace net::experimental::awaitable_operators;
using resolver = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::resolver>;
using signal_set_type = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using timer_type = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
using signal_set = net::use_awaitable_t<>::as_default_on_t<net::signal_set>;
using steady_timer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
using aedis::adapt;
using aedis::resp3::request;
using aedis::resp3::node;
/* This example will subscribe and read pushes indefinitely.
*
@@ -39,7 +37,7 @@ using aedis::resp3::node;
// Receives pushes.
auto receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
for (std::vector<node<std::string>> resp;;) {
for (std::vector<resp3::node<std::string>> resp;;) {
co_await conn->async_receive(adapt(resp));
std::cout << resp.at(1).value << " " << resp.at(2).value << " " << resp.at(3).value << std::endl;
resp.clear();
@@ -50,11 +48,10 @@ auto async_main() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
signal_set_type sig{ex, SIGINT, SIGTERM};
timer_type timer{ex};
signal_set sig{ex, SIGINT, SIGTERM};
steady_timer timer{ex};
request req;
req.get_config().cancel_on_connection_lost = true;
resp3::request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
@@ -63,6 +60,7 @@ auto async_main() -> net::awaitable<void>
co_await connect(conn, "127.0.0.1", "6379");
co_await ((conn->async_run() || healthy_checker(conn) || sig.async_wait() ||
receiver(conn)) && conn->async_exec(req));
conn->reset_stream();
timer.expires_after(std::chrono::seconds{1});
co_await timer.async_wait();

View File

@@ -9,10 +9,10 @@
#include <tuple>
#include <limits>
#include <string_view>
#include <variant>
#include <boost/mp11.hpp>
#include <boost/variant2.hpp>
#include <boost/utility/string_view.hpp>
#include <boost/system.hpp>
#include <aedis/resp3/node.hpp>
@@ -44,7 +44,7 @@ public:
void
operator()(
std::size_t, resp3::node<boost::string_view> const&, boost::system::error_code&) { }
std::size_t, resp3::node<std::string_view> const&, boost::system::error_code&) { }
[[nodiscard]]
auto get_supported_response_size() const noexcept
@@ -63,7 +63,7 @@ class static_adapter {
private:
static constexpr auto size = std::tuple_size<Tuple>::value;
using adapter_tuple = boost::mp11::mp_transform<adapter::adapter_t, Tuple>;
using variant_type = boost::mp11::mp_rename<adapter_tuple, boost::variant2::variant>;
using variant_type = boost::mp11::mp_rename<adapter_tuple, std::variant>;
using adapters_array_type = std::array<variant_type, size>;
adapters_array_type adapters_;
@@ -87,10 +87,10 @@ public:
void
operator()(
std::size_t i,
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
using boost::variant2::visit;
using std::visit;
// I am usure whether this should be an error or an assertion.
BOOST_ASSERT(i < adapters_.size());
visit([&](auto& arg){arg(nd, ec);}, adapters_.at(i));
@@ -122,7 +122,7 @@ public:
void
operator()(
std::size_t,
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
adapter_(nd, ec);
@@ -164,7 +164,7 @@ class wrapper {
public:
explicit wrapper(Adapter adapter) : adapter_{adapter} {}
void operator()(resp3::node<boost::string_view> const& node, boost::system::error_code& ec)
void operator()(resp3::node<std::string_view> const& node, boost::system::error_code& ec)
{ return adapter_(0, node, ec); }
[[nodiscard]]
@@ -210,7 +210,7 @@ inline auto adapt(std::size_t max_read_size = (std::numeric_limits<std::size_t>:
* 2. std::vector<node<String>>
*
* The types T1, T2, etc can be any STL container, any integer type
* and \c std::string
* and `std::string`.
*
* @param t Tuple containing the responses.
* @param max_read_size Specifies the maximum size of the read

View File

@@ -18,11 +18,10 @@
#include <deque>
#include <vector>
#include <array>
#include <string_view>
#include <charconv>
#include <boost/assert.hpp>
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/home/x3.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/type.hpp>
@@ -32,54 +31,35 @@
namespace aedis::adapter::detail {
inline
auto parse_double(
char const* data,
std::size_t size,
boost::system::error_code& ec) -> double
{
static constexpr boost::spirit::x3::real_parser<double> p{};
double ret = 0;
if (!parse(data, data + size, p, ret))
ec = error::not_a_double;
return ret;
}
// Serialization.
template <class T>
auto from_bulk(
T& i,
boost::string_view sv,
boost::system::error_code& ec) -> typename std::enable_if<std::is_integral<T>::value, void>::type
auto from_bulk(T& i, std::string_view sv, boost::system::error_code& ec) -> typename std::enable_if<std::is_integral<T>::value, void>::type
{
i = resp3::detail::parse_uint(sv.data(), sv.size(), ec);
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), i);
if (res.ec != std::errc())
ec = error::not_a_number;
}
inline
void from_bulk(
bool& t,
boost::string_view sv,
boost::system::error_code&)
void from_bulk(bool& t, std::string_view sv, boost::system::error_code&)
{
t = *sv.data() == 't';
}
inline
void from_bulk(
double& d,
boost::string_view sv,
boost::system::error_code& ec)
void from_bulk(double& d, std::string_view sv, boost::system::error_code& ec)
{
d = parse_double(sv.data(), sv.size(), ec);
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), d);
if (res.ec != std::errc())
ec = error::not_a_double;
}
template <class CharT, class Traits, class Allocator>
void
from_bulk(
std::basic_string<CharT, Traits, Allocator>& s,
boost::string_view sv,
std::string_view sv,
boost::system::error_code&)
{
s.append(sv.data(), sv.size());
@@ -105,7 +85,7 @@ private:
public:
explicit general_aggregate(Result* c = nullptr): result_(c) {}
void operator()(resp3::node<boost::string_view> const& n, boost::system::error_code&)
void operator()(resp3::node<std::string_view> const& n, boost::system::error_code&)
{
result_->push_back({n.data_type, n.aggregate_size, n.depth, std::string{std::cbegin(n.value), std::cend(n.value)}});
}
@@ -119,7 +99,7 @@ private:
public:
explicit general_simple(Node* t = nullptr) : result_(t) {}
void operator()(resp3::node<boost::string_view> const& n, boost::system::error_code& ec)
void operator()(resp3::node<std::string_view> const& n, boost::system::error_code& ec)
{
result_->data_type = n.data_type;
result_->aggregate_size = n.aggregate_size;
@@ -137,7 +117,7 @@ public:
void
operator()(
Result& result,
resp3::node<boost::string_view> const& n,
resp3::node<std::string_view> const& n,
boost::system::error_code& ec)
{
set_on_resp3_error(n.data_type, ec);
@@ -165,7 +145,7 @@ public:
void
operator()(
Result& result,
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
set_on_resp3_error(nd.data_type, ec);
@@ -204,7 +184,7 @@ public:
void
operator()(
Result& result,
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
set_on_resp3_error(nd.data_type, ec);
@@ -246,7 +226,7 @@ public:
void
operator()(
Result& result,
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
set_on_resp3_error(nd.data_type, ec);
@@ -274,7 +254,7 @@ public:
void
operator()(
Result& result,
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
set_on_resp3_error(nd.data_type, ec);
@@ -313,7 +293,7 @@ struct list_impl {
void
operator()(
Result& result,
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
set_on_resp3_error(nd.data_type, ec);
@@ -388,7 +368,7 @@ public:
void
operator()(
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
BOOST_ASSERT(result_);
@@ -407,7 +387,7 @@ public:
void
operator()(
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
if (nd.data_type == resp3::type::null)

View File

@@ -9,9 +9,10 @@
#include <vector>
#include <tuple>
#include <string_view>
#include <variant>
#include <boost/mp11.hpp>
#include <boost/variant2.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/type.hpp>
@@ -74,7 +75,7 @@ struct assigner {
template <class T1, class T2>
static void assign(T1& dest, T2& from)
{
dest[N] = internal_adapt(std::get<N>(from));
dest[N].template emplace<N>(internal_adapt(std::get<N>(from)));
assigner<N - 1>::assign(dest, from);
}
};
@@ -84,7 +85,7 @@ struct assigner<0> {
template <class T1, class T2>
static void assign(T1& dest, T2& from)
{
dest[0] = internal_adapt(std::get<0>(from));
dest[0].template emplace<0>(internal_adapt(std::get<0>(from)));
}
};
@@ -96,7 +97,7 @@ private:
boost::mp11::mp_rename<
boost::mp11::mp_transform<
adapter_t, Tuple>,
boost::variant2::variant>,
std::variant>,
std::tuple_size<Tuple>::value>;
std::size_t i_ = 0;
@@ -109,7 +110,7 @@ public:
detail::assigner<std::tuple_size<Tuple>::value - 1>::assign(adapters_, *r);
}
void count(resp3::node<boost::string_view> const& nd)
void count(resp3::node<std::string_view> const& nd)
{
if (nd.depth == 1) {
if (is_aggregate(nd.data_type))
@@ -126,10 +127,10 @@ public:
void
operator()(
resp3::node<boost::string_view> const& nd,
resp3::node<std::string_view> const& nd,
boost::system::error_code& ec)
{
using boost::variant2::visit;
using std::visit;
if (nd.depth == 0) {
auto const real_aggr_size = nd.aggregate_size * element_multiplicity(nd.data_type);

View File

@@ -18,9 +18,8 @@ namespace aedis {
/** @brief A connection to the Redis server.
* @ingroup high-level-api
*
* This class keeps a healthy connection to the Redis instance where
* commands can be sent at any time. For more details, please see the
* documentation of each individual function.
* For more details, please see the documentation of each individual
* function.
*
* @tparam AsyncReadWriteStream A stream that supports reading and
* writing.
@@ -47,7 +46,7 @@ public:
using base_type = detail::connection_base<executor_type, basic_connection<AsyncReadWriteStream>>;
/// Constructor
/// Contructs from an executor.
explicit
basic_connection(
executor_type ex,
@@ -56,6 +55,7 @@ public:
, stream_{ex}
{}
/// Contructs from a context.
explicit
basic_connection(
boost::asio::io_context& ioc,
@@ -82,10 +82,11 @@ public:
/// Returns a const reference to the next layer.
auto next_layer() const noexcept -> auto const& { return stream_; }
/** @brief Establishes a connection with the Redis server asynchronously.
/** @brief Starts read and write operations
*
* This function will start reading from the socket and executes
* all requests that have been started prior to this function
* This function starts read and write operations with the Redis
* server. More specifically it will trigger the write of all
* requests i.e. calls to `async_exec` that happened prior to this
* call.
*
* @param token Completion token.
@@ -96,9 +97,9 @@ public:
* void f(boost::system::error_code);
* @endcode
*
* This function will complete when the connection is lost as
* follows. If the error is boost::asio::error::eof this function
* will complete without error.
* This function will complete when the connection is lost. If the
* error is boost::asio::error::eof this function will complete
* without error.
*/
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_run(CompletionToken token = CompletionToken{})
@@ -108,11 +109,11 @@ public:
/** @brief Executes a command on the Redis server asynchronously.
*
* This function will send a request to the Redis server and
* complete when the response arrives. If the request contains
* only commands that don't expect a response, the completion
* occurs after it has been written to the underlying stream.
* Multiple concurrent calls to this function will be
* This function sends a request to the Redis server and
* complete after the response has been processed. If the request
* contains only commands that don't expect a response, the
* completion occurs after it has been written to the underlying
* stream. Multiple concurrent calls to this function will be
* automatically queued by the implementation.
*
* @param req Request object.
@@ -144,7 +145,7 @@ public:
*
* Users that expect server pushes should call this function in a
* loop. If a push arrives and there is no reader, the connection
* will hang and eventually timeout.
* will hang.
*
* @param adapter The response adapter.
* @param token The Asio completion token.
@@ -177,12 +178,7 @@ public:
* @li operation::run: Cancels the `async_run` operation. Notice
* that the preferred way to close a connection is to send a
* [QUIT](https://redis.io/commands/quit/) command to the server.
* An unresponsive Redis server will also cause the idle-checks to
* timeout and lead to `connection::async_run` completing with
* `error::idle_timeout`. Calling `cancel(operation::run)`
* directly should be seen as the last option.
* @li operation::receive: Cancels any ongoing callto
* `async_receive`.
* @li operation::receive: Cancels any ongoing calls to * `async_receive`.
*
* @param op: The operation to be cancelled.
* @returns The number of operations that have been canceled.
@@ -196,7 +192,6 @@ private:
template <class, class> friend class detail::connection_base;
template <class, class> friend struct detail::exec_read_op;
template <class, class> friend struct detail::exec_op;
template <class, class> friend struct detail::receive_op;
template <class> friend struct detail::reader_op;
template <class> friend struct detail::writer_op;
template <class> friend struct detail::run_op;

View File

@@ -19,6 +19,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <aedis/adapt.hpp>
@@ -47,7 +48,7 @@ public:
connection_base(executor_type ex, std::pmr::memory_resource* resource)
: writer_timer_{ex}
, read_timer_{ex}
, push_channel_{ex}
, guarded_op_{ex}
, read_buffer_{resource}
, write_buffer_{resource}
, reqs_{resource}
@@ -76,7 +77,7 @@ public:
}
case operation::receive:
{
push_channel_.cancel();
guarded_op_.cancel();
return 1U;
}
default: BOOST_ASSERT(false); return 0;
@@ -103,16 +104,19 @@ public:
return ret;
}
// Remove requests that have the flag cancel_if_not_sent_when_connection_lost set
auto cancel_on_conn_lost() -> std::size_t
{
// Must return false if the request should be removed.
auto cond = [](auto const& ptr)
{
BOOST_ASSERT(ptr != nullptr);
if (ptr->get_request().get_config().cancel_on_connection_lost)
return false;
return !(!ptr->get_request().get_config().retry && ptr->is_written());
if (ptr->is_written()) {
return ptr->get_request().get_config().retry_on_connection_lost;
} else {
return !ptr->get_request().get_config().cancel_on_connection_lost;
}
};
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
@@ -127,6 +131,7 @@ public:
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return ptr->reset_status();
});
return ret;
}
@@ -154,10 +159,10 @@ public:
CompletionToken token = CompletionToken{})
{
auto f = detail::make_adapter_wrapper(adapter);
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::receive_op<Derived, decltype(f)>{&derived(), f}, token, writer_timer_);
return guarded_op_.async_wait(
resp3::async_read(derived().next_layer(), make_dynamic_buffer(adapter.get_max_read_size(0)), f, boost::asio::deferred),
std::move(token));
}
template <class CompletionToken>
@@ -173,9 +178,6 @@ private:
using clock_type = std::chrono::steady_clock;
using clock_traits_type = boost::asio::wait_traits<clock_type>;
using timer_type = boost::asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using resolver_type = boost::asio::ip::basic_resolver<boost::asio::ip::tcp, executor_type>;
using push_channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
auto derived() -> Derived& { return static_cast<Derived&>(*this); }
@@ -276,7 +278,6 @@ private:
using reqs_type = std::pmr::deque<std::shared_ptr<req_info>>;
template <class, class> friend struct detail::receive_op;
template <class> friend struct detail::reader_op;
template <class> friend struct detail::writer_op;
template <class> friend struct detail::run_op;
@@ -368,10 +369,12 @@ private:
}
}
// IO objects
// Notice we use a timer to simulate a condition-variable. It is
// also more suitable than a channel and the notify operation does
// not suspend.
timer_type writer_timer_;
timer_type read_timer_;
push_channel_type push_channel_;
detail::guarded_operation<executor_type> guarded_op_;
std::pmr::string read_buffer_;
std::pmr::string write_buffer_;

View File

@@ -9,6 +9,7 @@
#include <array>
#include <algorithm>
#include <string_view>
#include <boost/assert.hpp>
#include <boost/system.hpp>
@@ -18,7 +19,7 @@
#include <aedis/adapt.hpp>
#include <aedis/error.hpp>
#include <aedis/detail/net.hpp>
#include <aedis/detail/guarded_operation.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/read.hpp>
@@ -29,46 +30,6 @@
namespace aedis::detail {
template <class Conn, class Adapter>
struct receive_op {
Conn* conn = nullptr;
Adapter adapter;
std::size_t read_size = 0;
boost::asio::coroutine coro{};
template <class Self>
void
operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
yield conn->push_channel_.async_receive(std::move(self));
AEDIS_CHECK_OP1();
yield
resp3::async_read(
conn->next_layer(),
conn->make_dynamic_buffer(adapter.get_max_read_size(0)),
adapter, std::move(self));
// cancel(receive) is needed to cancel the channel, otherwise
// the read operation will be blocked forever see
// test_push_adapter.
AEDIS_CHECK_OP1(conn->cancel(operation::run); conn->cancel(operation::receive));
read_size = n;
yield conn->push_channel_.async_send({}, 0, std::move(self));
AEDIS_CHECK_OP1();
self.complete({}, read_size);
return;
}
}
};
template <class Conn, class Adapter>
struct exec_read_op {
Conn* conn;
@@ -101,15 +62,14 @@ struct exec_read_op {
conn->next_layer(),
conn->make_dynamic_buffer(),
"\r\n", std::move(self));
AEDIS_CHECK_OP1(conn->cancel(operation::run));
AEDIS_CHECK_OP1(conn->cancel(operation::run););
}
// If the next request is a push we have to handle it to
// the receive_op wait for it to be done and continue.
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) {
yield
async_send_receive(conn->push_channel_, std::move(self));
AEDIS_CHECK_OP1(conn->cancel(operation::run));
yield conn->guarded_op_.async_run(std::move(self));
AEDIS_CHECK_OP1(conn->cancel(operation::run););
continue;
}
//-----------------------------------
@@ -118,12 +78,12 @@ struct exec_read_op {
resp3::async_read(
conn->next_layer(),
conn->make_dynamic_buffer(adapter.get_max_read_size(index)),
[i = index, adpt = adapter] (resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable { adpt(i, nd, ec); },
[i = index, adpt = adapter] (resp3::node<std::string_view> const& nd, boost::system::error_code& ec) mutable { adpt(i, nd, ec); },
std::move(self));
++index;
AEDIS_CHECK_OP1(conn->cancel(operation::run));
AEDIS_CHECK_OP1(conn->cancel(operation::run););
read_size += n;
@@ -201,14 +161,7 @@ EXEC_OP_WAIT:
BOOST_ASSERT(conn->cmds_ != 0);
yield
conn->async_exec_read(adapter, conn->reqs_.front()->get_number_of_commands(), std::move(self));
if (is_cancelled(self)) {
conn->remove_request(info);
return self.complete(boost::asio::error::operation_aborted, {});
}
if (ec) {
return self.complete(ec, {});
}
AEDIS_CHECK_OP1(;);
read_size = n;
@@ -285,7 +238,7 @@ struct writer_op {
conn->coalesce_requests();
yield
boost::asio::async_write(conn->next_layer(), boost::asio::buffer(conn->write_buffer_), std::move(self));
AEDIS_CHECK_OP0(conn->cancel(operation::run));
AEDIS_CHECK_OP0(conn->cancel(operation::run););
conn->on_write();
@@ -335,7 +288,7 @@ struct reader_op {
return self.complete({}); // EOFINAE: EOF is not an error.
}
AEDIS_CHECK_OP0(conn->cancel(operation::run));
AEDIS_CHECK_OP0(conn->cancel(operation::run););
// We handle unsolicited events in the following way
//
@@ -358,25 +311,20 @@ struct reader_op {
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push
|| conn->reqs_.empty()
|| (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)) {
yield async_send_receive(conn->push_channel_, std::move(self));
if (!conn->is_open() || ec || is_cancelled(self)) {
conn->cancel(operation::run);
self.complete(boost::asio::error::basic_errors::operation_aborted);
return;
}
yield conn->guarded_op_.async_run(std::move(self));
} else {
BOOST_ASSERT(conn->cmds_ != 0);
BOOST_ASSERT(!conn->reqs_.empty());
BOOST_ASSERT(conn->reqs_.front()->get_number_of_commands() != 0);
conn->reqs_.front()->proceed();
yield conn->read_timer_.async_wait(std::move(self));
if (!conn->is_open() || is_cancelled(self)) {
// Added this cancel here to make sure any outstanding
// ping is cancelled.
conn->cancel(operation::run);
self.complete(boost::asio::error::basic_errors::operation_aborted);
return;
}
ec = {};
}
if (!conn->is_open() || ec || is_cancelled(self)) {
conn->cancel(operation::run);
self.complete(boost::asio::error::basic_errors::operation_aborted);
return;
}
}
}

View File

@@ -0,0 +1,108 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_DETAIL_GUARDED_OPERATION_HPP
#define AEDIS_DETAIL_GUARDED_OPERATION_HPP
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/yield.hpp>
namespace aedis::detail {
template <class Executor>
struct send_receive_op {
using channel_type = boost::asio::experimental::channel<Executor, void(boost::system::error_code, std::size_t)>;
channel_type* channel;
boost::asio::coroutine coro{};
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro)
{
yield channel->async_send(boost::system::error_code{}, 0, std::move(self));
AEDIS_CHECK_OP0(;);
yield channel->async_send(boost::system::error_code{}, 0, std::move(self));
AEDIS_CHECK_OP0(;);
self.complete({});
}
}
};
template <class Executor, class Op>
struct wait_op {
using channel_type = boost::asio::experimental::channel<Executor, void(boost::system::error_code, std::size_t)>;
channel_type* channel;
Op op;
std::size_t res = 0;
boost::asio::coroutine coro{};
template <class Self>
void
operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro)
{
yield channel->async_receive(std::move(self));
AEDIS_CHECK_OP1(;);
yield std::move(op)(std::move(self));
AEDIS_CHECK_OP1(channel->cancel(););
res = n;
yield channel->async_receive(std::move(self));
AEDIS_CHECK_OP1(;);
self.complete({}, res);
return;
}
}
};
template <class Executor = boost::asio::any_io_executor>
class guarded_operation {
public:
using executor_type = Executor;
guarded_operation(executor_type ex) : channel_{ex} {}
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_run(CompletionToken&& token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(send_receive_op<executor_type>{&channel_}, token, channel_);
}
template <class Op, class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_wait(Op&& op, CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(wait_op<executor_type, Op>{&channel_, std::move(op)}, token, channel_);
}
void cancel() {channel_.cancel();}
private:
using channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
template <class> friend struct send_receive_op;
template <class, class> friend struct wait_op;
channel_type channel_;
};
} // aedis::detail
#include <boost/asio/unyield.hpp>
#endif // AEDIS_DETAIL_GUARDED_OPERATION_HPP

View File

@@ -1,62 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_NET_HPP
#define AEDIS_NET_HPP
#include <array>
#include <boost/system.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/connect.hpp>
#include <boost/assert.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <boost/asio/yield.hpp>
namespace aedis::detail {
template <class Channel>
struct send_receive_op {
Channel* channel;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t = 0)
{
reenter (coro)
{
yield
channel->async_send(boost::system::error_code{}, 0, std::move(self));
AEDIS_CHECK_OP1();
yield
channel->async_receive(std::move(self));
AEDIS_CHECK_OP1();
self.complete({}, 0);
}
}
};
template <
class Channel,
class CompletionToken =
boost::asio::default_completion_token_t<typename Channel::executor_type>
>
auto async_send_receive(Channel& channel, CompletionToken&& token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(send_receive_op<Channel>{&channel}, token, channel);
}
} // aedis::detail
#include <boost/asio/unyield.hpp>
#endif // AEDIS_NET_HPP

View File

@@ -1,25 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/home/x3.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/type.hpp>
namespace aedis::resp3::detail {
auto parse_uint(char const* data, std::size_t size, boost::system::error_code& ec) -> std::size_t
{
static constexpr boost::spirit::x3::uint_parser<std::size_t, 10> p{};
std::size_t ret = 0;
if (!parse(data, data + size, p, ret))
ec = error::not_a_number;
return ret;
}
} // aedis::resp3::detail

View File

@@ -10,21 +10,31 @@
#include <array>
#include <limits>
#include <system_error>
#include <charconv>
#include <string_view>
#include <cstdint>
#include <boost/assert.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/node.hpp>
namespace aedis::resp3::detail {
auto parse_uint(char const* data, std::size_t size, boost::system::error_code& ec) -> std::size_t;
using int_type = std::uint64_t;
inline
void to_int(int_type& i, std::string_view sv, boost::system::error_code& ec)
{
auto const res = std::from_chars(sv.data(), sv.data() + std::size(sv), i);
if (res.ec != std::errc())
ec = error::not_a_number;
}
template <class ResponseAdapter>
class parser {
private:
using node_type = node<boost::string_view>;
using node_type = node<std::string_view>;
static constexpr std::size_t max_embedded_depth = 5;
ResponseAdapter adapter_;
@@ -40,7 +50,7 @@ private:
std::array<std::size_t, max_embedded_depth + 1> sizes_ = {{1}};
// Contains the length expected in the next bulk read.
std::size_t bulk_length_ = (std::numeric_limits<std::size_t>::max)();
int_type bulk_length_ = (std::numeric_limits<unsigned long>::max)();
// The type of the next bulk. Contains type::invalid if no bulk is
// expected.
@@ -83,7 +93,7 @@ public:
switch (t) {
case type::streamed_string_part:
{
bulk_length_ = parse_uint(data + 1, n - 2, ec);
to_int(bulk_length_ , std::string_view{data + 1, n - 3}, ec);
if (ec)
return 0;
@@ -106,7 +116,7 @@ public:
// 0.
sizes_[++depth_] = (std::numeric_limits<std::size_t>::max)();
} else {
bulk_length_ = parse_uint(data + 1, n - 2, ec);
to_int(bulk_length_ , std::string_view{data + 1, n - 3} , ec);
if (ec)
return 0;
@@ -169,7 +179,8 @@ public:
case type::attribute:
case type::map:
{
auto const l = parse_uint(data + 1, n - 2, ec);
int_type l = -1;
to_int(l, std::string_view{data + 1, n - 3}, ec);
if (ec)
return 0;

View File

@@ -7,12 +7,12 @@
#ifndef AEDIS_RESP3_READ_OPS_HPP
#define AEDIS_RESP3_READ_OPS_HPP
#include <string_view>
#include <boost/assert.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <boost/asio/yield.hpp>
@@ -28,14 +28,14 @@ auto is_cancelled(T const& self)
#define AEDIS_CHECK_OP0(X)\
if (ec || aedis::detail::is_cancelled(self)) {\
X;\
X\
self.complete(!!ec ? ec : boost::asio::error::operation_aborted);\
return;\
}
#define AEDIS_CHECK_OP1(X)\
if (ec || aedis::detail::is_cancelled(self)) {\
X;\
X\
self.complete(!!ec ? ec : boost::asio::error::operation_aborted, {});\
return;\
}
@@ -43,7 +43,7 @@ auto is_cancelled(T const& self)
namespace aedis::resp3::detail {
struct ignore_response {
void operator()(node<boost::string_view> nd, boost::system::error_code& ec)
void operator()(node<std::string_view> nd, boost::system::error_code& ec)
{
switch (nd.data_type) {
case resp3::type::simple_error: ec = error::resp3_simple_error; return;
@@ -82,7 +82,7 @@ public:
if (parser_.bulk() == type::invalid) {
yield
boost::asio::async_read_until(stream_, buf_, "\r\n", std::move(self));
AEDIS_CHECK_OP1();
AEDIS_CHECK_OP1(;);
} else {
// On a bulk read we can't read until delimiter since the
// payload may contain the delimiter itself so we have to
@@ -101,7 +101,7 @@ public:
buf_.data(buffer_size_, parser_.bulk_length() + 2 - buffer_size_),
boost::asio::transfer_all(),
std::move(self));
AEDIS_CHECK_OP1();
AEDIS_CHECK_OP1(;);
}
n = parser_.bulk_length() + 2;

View File

@@ -4,11 +4,12 @@
* accompanying file LICENSE.txt)
*/
#include <string_view>
#include <aedis/resp3/request.hpp>
namespace aedis::resp3::detail {
auto has_push_response(boost::string_view cmd) -> bool
auto has_push_response(std::string_view cmd) -> bool
{
if (cmd == "SUBSCRIBE") return true;
if (cmd == "PSUBSCRIBE") return true;
@@ -16,9 +17,4 @@ auto has_push_response(boost::string_view cmd) -> bool
return false;
}
auto is_hello(boost::string_view cmd) -> bool
{
return cmd == "HELLO";
}
} // aedis::resp3::detail

View File

@@ -9,9 +9,6 @@
#include <aedis/resp3/type.hpp>
#include <string>
#include <vector>
namespace aedis::resp3 {
/** \brief A node in the response tree.
@@ -38,27 +35,6 @@ struct node {
String value{};
};
/** @brief Converts the node to a string.
* @relates node
*
* @param in The node object.
*/
template <class String>
auto to_string(node<String> const& in)
{
std::string out;
out += std::to_string(in.depth);
out += '\t';
out += to_string(in.data_type);
out += '\t';
out += std::to_string(in.aggregate_size);
out += '\t';
if (!is_aggregate(in.data_type))
out.append(in.value.data(), in.value.size());
return out;
}
/** @brief Compares a node for equality.
* @relates node
*
@@ -74,21 +50,6 @@ auto operator==(node<String> const& a, node<String> const& b)
&& a.value == b.value;
};
/** @brief Writes the node string to the stream.
* @relates node
*
* @param os Output stream.
* @param node Node object.
*
* \remark Binary data is not converted to text.
*/
template <class String>
auto operator<<(std::ostream& os, node<String> const& node) -> std::ostream&
{
os << to_string(node);
return os;
}
} // aedis::resp3
#endif // AEDIS_RESP3_NODE_HPP

View File

@@ -10,10 +10,6 @@
#include <string>
#include <tuple>
#include <memory_resource>
#include <boost/hana.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/resp3/type.hpp>
// NOTE: Consider detecting tuples in the type in the parameter pack
@@ -46,7 +42,7 @@ constexpr char const* separator = "\r\n";
* See more in @ref serialization.
*/
template <class Request>
void to_bulk(Request& to, boost::string_view data)
void to_bulk(Request& to, std::string_view data)
{
auto const str = std::to_string(data.size());
@@ -61,14 +57,12 @@ template <class Request, class T, typename = typename std::enable_if<std::is_int
void to_bulk(Request& to, T n)
{
auto const s = std::to_string(n);
to_bulk(to, boost::string_view{s});
to_bulk(to, std::string_view{s});
}
namespace detail {
auto has_push_response(boost::string_view cmd) -> bool;
auto is_hello(boost::string_view cmd) -> bool;
auto has_push_response(std::string_view cmd) -> bool;
template <class T>
struct add_bulk_impl {
@@ -80,6 +74,21 @@ struct add_bulk_impl {
}
};
template <class ...Ts>
struct add_bulk_impl<std::tuple<Ts...>> {
template <class Request>
static void add(Request& to, std::tuple<Ts...> const& t)
{
auto f = [&](auto const&... vs)
{
using namespace aedis::resp3;
(to_bulk(to, vs), ...);
};
std::apply(f, t);
}
};
template <class U, class V>
struct add_bulk_impl<std::pair<U, V>> {
template <class Request>
@@ -91,23 +100,6 @@ struct add_bulk_impl<std::pair<U, V>> {
}
};
template <class ...Ts>
struct add_bulk_impl<boost::hana::tuple<Ts...>> {
template <class Request>
static void add(Request& to, boost::hana::tuple<Ts...> const& from)
{
using boost::hana::for_each;
// Fold expressions is C++17 so we use hana.
//(detail::add_bulk(*request_, args), ...);
for_each(from, [&](auto const& e) {
using namespace aedis::resp3;
to_bulk(to, e);
});
}
};
template <class Request>
void add_header(Request& to, type t, std::size_t size)
{
@@ -138,7 +130,7 @@ struct bulk_counter<std::pair<T, U>> {
};
template <class Request>
void add_blob(Request& to, boost::string_view blob)
void add_blob(Request& to, std::string_view blob)
{
to.append(std::cbegin(blob), std::cend(blob));
to += separator;
@@ -171,43 +163,41 @@ void add_separator(Request& to)
*
* \li Non-string types will be converted to string by using \c
* to_bulk, which must be made available over ADL.
* \li Uses std::string as internal storage.
* \li Uses a std::pmr::string for internal storage.
*/
class request {
public:
/// Request configuration options.
struct config {
/** \brief If set to true, requests started with
* `aedis::connection::async_exec` will fail if the connection is
* lost while the request is pending. The default
* behaviour is not to close requests.
/** \brief Setting it to true will cause
* `aedis::connection::async_exec` to complete with error if the
* connection is lost. Affects only requests that haven't been
* sent yet.
*/
bool cancel_on_connection_lost = false;
bool cancel_on_connection_lost = true;
/** \brief If true this request will be coalesced with other requests,
* see https://redis.io/topics/pipelining. If false, this
* request will be sent individually.
/** \brief If true the request will be coalesced with other
* requests, see https://redis.io/topics/pipelining. Otherwise
* the request is sent individually.
*/
bool coalesce = true;
/** \brief If set to true, requests started with
* `aedis::connection::async_exec` will fail if the call happens
* before the connection with Redis was stablished.
/** \brief If true, the request will complete with error if the
* call happens before the connection with Redis was established.
*/
bool cancel_if_not_connected = false;
/** \brief If true, the implementation will resend this
* request if it remained unresponded when
* `aedis::connection::async_run` completed. Has effect only if
* cancel_on_connection_lost is true.
/** \brief If true `aedis::connection::async_exec` will not
* cancel this request if the connection is lost. Affects only
* requests that have been written to the socket but remained
* unresponded when `aedis::connection::async_run` completed.
*/
bool retry = true;
bool retry_on_connection_lost = false;
/** \brief If this request has a HELLO command and this flag is
* set to true, the `aedis::connection` will move it to the
* front of the queue of awaiting requests. This makes it
* possible to send HELLO and authenticate before other
* commands are sent.
* true, the `aedis::connection` will move it to the front of
* the queue of awaiting requests. This makes it possible to
* send HELLO and authenticate before other commands are sent.
*/
bool hello_with_priority = true;
};
@@ -218,7 +208,7 @@ public:
* \param resource Memory resource.
*/
explicit
request(config cfg = config{false, true, false, true, true},
request(config cfg = config{true, true, false, false, true},
std::pmr::memory_resource* resource = std::pmr::get_default_resource())
: cfg_{cfg}, payload_(resource) {}
@@ -239,7 +229,7 @@ public:
commands_ = 0;
}
/// Calls std::string::reserve on the internal storage.
/// Calls std::pmr::string::reserve on the internal storage.
void reserve(std::size_t new_cap = 0)
{ payload_.reserve(new_cap); }
@@ -265,16 +255,14 @@ public:
* \param args Command arguments.
*/
template <class... Ts>
void push(boost::string_view cmd, Ts const&... args)
void push(std::string_view cmd, Ts const&... args)
{
using boost::hana::for_each;
using boost::hana::make_tuple;
using resp3::type;
auto constexpr pack_size = sizeof...(Ts);
detail::add_header(payload_, type::array, 1 + pack_size);
detail::add_bulk(payload_, cmd);
detail::add_bulk(payload_, make_tuple(args...));
detail::add_bulk(payload_, std::tie(std::forward<Ts const&>(args)...));
check_cmd(cmd);
}
@@ -301,7 +289,7 @@ public:
* \param end Iterator to the end of the range.
*/
template <class Key, class ForwardIterator>
void push_range(boost::string_view cmd, Key const& key, ForwardIterator begin, ForwardIterator end,
void push_range(std::string_view cmd, Key const& key, ForwardIterator begin, ForwardIterator end,
typename std::iterator_traits<ForwardIterator>::value_type * = nullptr)
{
using value_type = typename std::iterator_traits<ForwardIterator>::value_type;
@@ -340,7 +328,7 @@ public:
* \param end Iterator to the end of the range.
*/
template <class ForwardIterator>
void push_range(boost::string_view cmd, ForwardIterator begin, ForwardIterator end,
void push_range(std::string_view cmd, ForwardIterator begin, ForwardIterator end,
typename std::iterator_traits<ForwardIterator>::value_type * = nullptr)
{
using value_type = typename std::iterator_traits<ForwardIterator>::value_type;
@@ -369,7 +357,7 @@ public:
* \param range Range to send e.g. and \c std::map.
*/
template <class Key, class Range>
void push_range(boost::string_view cmd, Key const& key, Range const& range,
void push_range(std::string_view cmd, Key const& key, Range const& range,
decltype(std::begin(range)) * = nullptr)
{
using std::begin;
@@ -385,7 +373,7 @@ public:
* \param range Range to send e.g. and \c std::map.
*/
template <class Range>
void push_range(boost::string_view cmd, Range const& range,
void push_range(std::string_view cmd, Range const& range,
decltype(std::begin(range)) * = nullptr)
{
using std::begin;
@@ -394,12 +382,13 @@ public:
}
private:
void check_cmd(boost::string_view cmd)
void check_cmd(std::string_view cmd)
{
if (!detail::has_push_response(cmd))
++commands_;
has_hello_priority_ = detail::is_hello(cmd) && cfg_.hello_with_priority;
if (cmd == "HELLO")
has_hello_priority_ = cfg_.hello_with_priority;
}
config cfg_;

View File

@@ -7,4 +7,3 @@
#include <aedis/impl/error.ipp>
#include <aedis/resp3/impl/request.ipp>
#include <aedis/resp3/impl/type.ipp>
#include <aedis/resp3/detail/impl/parser.ipp>

View File

@@ -139,7 +139,6 @@ private:
template <class, class> friend class aedis::detail::connection_base;
template <class, class> friend struct aedis::detail::exec_op;
template <class, class> friend struct aedis::detail::exec_read_op;
template <class, class> friend struct detail::receive_op;
template <class> friend struct aedis::detail::run_op;
template <class> friend struct aedis::detail::writer_op;
template <class> friend struct aedis::detail::reader_op;

View File

@@ -15,3 +15,9 @@ resolve(
net::ip::tcp::resolver resv{ioc};
return resv.resolve(host, port);
}
#ifdef BOOST_ASIO_HAS_CO_AWAIT
inline
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }
#endif // BOOST_ASIO_HAS_CO_AWAIT

View File

@@ -8,27 +8,20 @@
#include <boost/asio.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#include <boost/system/errc.hpp>
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "common.hpp"
#include "../examples/common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using error_code = boost::system::error_code;
using aedis::resp3::request;
using aedis::operation;
using aedis::adapt;
using connection = net::use_awaitable_t<>::as_default_on_t<aedis::connection>;
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace net::experimental::awaitable_operators;
net::awaitable<void> push_consumer(std::shared_ptr<connection> conn, int expected)
auto push_consumer(std::shared_ptr<connection> conn, int expected) -> net::awaitable<void>
{
int c = 0;
for (;;) {
@@ -37,7 +30,7 @@ net::awaitable<void> push_consumer(std::shared_ptr<connection> conn, int expecte
break;
}
request req;
resp3::request req;
req.push("HELLO", 3);
req.push("QUIT");
co_await conn->async_exec(req, adapt());
@@ -47,7 +40,7 @@ auto echo_session(std::shared_ptr<connection> conn, std::string id, int n) -> ne
{
auto ex = co_await net::this_coro::executor;
request req;
resp3::request req;
std::tuple<aedis::ignore, std::string> resp;
for (auto i = 0; i < n; ++i) {
@@ -57,8 +50,8 @@ auto echo_session(std::shared_ptr<connection> conn, std::string id, int n) -> ne
req.push("PING", msg);
req.push("SUBSCRIBE", "channel");
boost::system::error_code ec;
co_await conn->async_exec(req, adapt(resp), net::redirect_error(net::use_awaitable, ec));
BOOST_TEST(!ec);
co_await conn->async_exec(req, adapt(resp), redir(ec));
BOOST_CHECK_EQUAL(ec, boost::system::error_code{});
BOOST_CHECK_EQUAL(msg, std::get<1>(resp));
req.clear();
std::get<1>(resp).clear();
@@ -70,8 +63,8 @@ auto async_echo_stress() -> net::awaitable<void>
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
int const sessions = 1000;
int const msgs = 100;
int const sessions = 500;
int const msgs = 1000;
int total = sessions * msgs;
net::co_spawn(ex, push_consumer(conn, total), net::detached);
@@ -79,16 +72,13 @@ auto async_echo_stress() -> net::awaitable<void>
for (int i = 0; i < sessions; ++i)
net::co_spawn(ex, echo_session(conn, std::to_string(i), msgs), net::detached);
auto const addrs = resolve();
co_await net::async_connect(conn->next_layer(), addrs);
co_await connect(conn, "127.0.0.1", "6379");
co_await conn->async_run();
}
BOOST_AUTO_TEST_CASE(echo_stress)
{
net::io_context ioc;
net::co_spawn(ioc, async_echo_stress(), net::detached);
ioc.run();
run(async_echo_stress());
}
#else

View File

@@ -20,15 +20,72 @@
// been already writen.
namespace net = boost::asio;
using aedis::resp3::request;
using aedis::adapt;
using connection = aedis::connection;
namespace resp3 = aedis::resp3;
using error_code = boost::system::error_code;
using connection = aedis::connection;
using aedis::adapt;
BOOST_AUTO_TEST_CASE(hello_priority)
{
resp3::request req1;
req1.get_config().coalesce = false;
req1.push("PING", "req1");
resp3::request req2;
req2.get_config().coalesce = false;
req2.get_config().hello_with_priority = false;
req2.push("HELLO", 3);
req2.push("PING", "req2");
req2.push("QUIT");
resp3::request req3;
req3.get_config().coalesce = false;
req3.get_config().hello_with_priority = true;
req3.push("HELLO", 3);
req3.push("PING", "req3");
net::io_context ioc;
auto const endpoints = resolve();
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
bool seen1 = false;
bool seen2 = false;
bool seen3 = false;
conn.async_exec(req1, adapt(), [&](auto ec, auto){
std::cout << "bbb" << std::endl;
BOOST_TEST(!ec);
BOOST_TEST(!seen2);
BOOST_TEST(seen3);
seen1 = true;
});
conn.async_exec(req2, adapt(), [&](auto ec, auto){
std::cout << "ccc" << std::endl;
BOOST_TEST(!ec);
BOOST_TEST(seen1);
BOOST_TEST(seen3);
seen2 = true;
});
conn.async_exec(req3, adapt(), [&](auto ec, auto){
std::cout << "ddd" << std::endl;
BOOST_TEST(!ec);
BOOST_TEST(!seen1);
BOOST_TEST(!seen2);
seen3 = true;
});
conn.async_run([](auto ec){
BOOST_TEST(!ec);
});
ioc.run();
}
BOOST_AUTO_TEST_CASE(wrong_response_data_type)
{
request req;
resp3::request req;
req.push("HELLO", 3);
req.push("QUIT");
@@ -52,7 +109,7 @@ BOOST_AUTO_TEST_CASE(wrong_response_data_type)
BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected)
{
request req;
resp3::request req;
req.get_config().cancel_if_not_connected = true;
req.push("HELLO", 3);
req.push("PING");
@@ -66,16 +123,23 @@ BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected)
ioc.run();
}
BOOST_AUTO_TEST_CASE(request_retry)
// TODO: This test is broken.
BOOST_AUTO_TEST_CASE(request_retry_false)
{
request req1;
req1.get_config().cancel_on_connection_lost = true;
req1.push("HELLO", 3);
req1.push("CLIENT", "PAUSE", 7000);
resp3::request req0;
req0.get_config().coalesce = false;
req0.get_config().cancel_on_connection_lost = true;
req0.push("HELLO", 3);
request req2;
resp3::request req1;
req1.get_config().coalesce = true;
req1.get_config().cancel_on_connection_lost = true;
req1.push("BLPOP", "any", 0);
resp3::request req2;
req2.get_config().coalesce = true;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().retry = false;
req2.get_config().retry_on_connection_lost = false;
req2.push("PING");
net::io_context ioc;
@@ -93,10 +157,14 @@ BOOST_AUTO_TEST_CASE(request_retry)
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req1, adapt(), [](auto ec, auto){
conn.async_exec(req0, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_exec(req1, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req2, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
@@ -107,3 +175,70 @@ BOOST_AUTO_TEST_CASE(request_retry)
ioc.run();
}
BOOST_AUTO_TEST_CASE(request_retry_true)
{
resp3::request req0;
req0.get_config().coalesce = false;
req0.get_config().cancel_on_connection_lost = true;
req0.push("HELLO", 3);
resp3::request req1;
req1.get_config().coalesce = true;
req1.get_config().cancel_on_connection_lost = true;
req1.push("BLPOP", "any", 0);
resp3::request req2;
req2.get_config().coalesce = true;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().retry_on_connection_lost = true;
req2.push("PING");
resp3::request req3;
req3.get_config().coalesce = true;
req3.get_config().cancel_on_connection_lost = true;
req3.get_config().retry_on_connection_lost = false;
req3.push("QUIT");
net::io_context ioc;
connection conn{ioc};
net::steady_timer st{ioc};
st.expires_after(std::chrono::seconds{1});
st.async_wait([&](auto){
// Cancels the request before receiving the response. This
// should cause the second request to complete with error
// although it has cancel_on_connection_lost = false.
conn.cancel(aedis::operation::run);
});
auto const endpoints = resolve();
net::connect(conn.next_layer(), endpoints);
conn.async_exec(req0, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
conn.async_exec(req1, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
});
conn.async_exec(req2, adapt(), [&](auto ec, auto){
BOOST_TEST(!ec);
conn.async_exec(req3, adapt(), [&](auto ec, auto){
BOOST_TEST(!ec);
});
});
conn.async_run([&](auto ec){
BOOST_CHECK_EQUAL(ec, boost::system::errc::errc_t::operation_canceled);
conn.reset_stream();
net::connect(conn.next_layer(), endpoints);
conn.async_run([&](auto ec){
std::cout << ec.message() << std::endl;
BOOST_TEST(!ec);
});
});
ioc.run();
}

View File

@@ -8,48 +8,48 @@
#include <boost/asio.hpp>
#ifdef BOOST_ASIO_HAS_CO_AWAIT
#include <boost/system/errc.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "common.hpp"
#include "../examples/common/common.hpp"
// NOTE1: Sends hello separately. I have observed that if hello and
// blpop are sent toguether, Redis will send the response of hello
// right away, not waiting for blpop. That is why we have to send it
// separately here.
namespace net = boost::asio;
using aedis::resp3::request;
namespace resp3 = aedis::resp3;
using error_code = boost::system::error_code;
using namespace net::experimental::awaitable_operators;
using aedis::operation;
using aedis::adapt;
using connection = aedis::connection;
using error_code = boost::system::error_code;
#include <boost/asio/experimental/awaitable_operators.hpp>
using namespace net::experimental::awaitable_operators;
auto async_run(std::shared_ptr<connection> conn, error_code expected) -> net::awaitable<void>
auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
boost::system::error_code ec;
co_await conn->async_run(net::redirect_error(net::use_awaitable, ec));
std::cout << ec.message() << std::endl;
BOOST_CHECK_EQUAL(ec, expected);
}
auto conn = std::make_shared<connection>(ex);
co_await connect(conn, "127.0.0.1", "6379");
auto async_cancel_exec(std::shared_ptr<connection> conn) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
conn->async_run([conn](auto ec) {
BOOST_TEST(!ec);
});
net::steady_timer st{ex};
st.expires_after(std::chrono::seconds{1});
boost::system::error_code ec1;
// See NOTE1.
resp3::request req0;
req0.get_config().coalesce = false;
req0.push("HELLO", 3);
std::ignore = co_await conn->async_exec(req0, adapt(), net::use_awaitable);
request req1;
resp3::request req1;
req1.get_config().coalesce = false;
req1.push("HELLO", 3);
req1.push("BLPOP", "any", 3);
// Should not be canceled.
@@ -57,22 +57,23 @@ auto async_cancel_exec(std::shared_ptr<connection> conn) -> net::awaitable<void>
BOOST_TEST(!ec);
});
request req2;
resp3::request req2;
req2.get_config().coalesce = false;
req2.push("PING", "second");
// Should be canceled.
conn->async_exec(req1, adapt(), [](auto ec, auto){
conn->async_exec(req2, adapt(), [](auto ec, auto){
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted);
});
// Will complete while BLPOP is pending.
boost::system::error_code ec1;
co_await st.async_wait(net::redirect_error(net::use_awaitable, ec1));
conn->cancel(operation::exec);
BOOST_TEST(!ec1);
request req3;
resp3::request req3;
req3.push("QUIT");
// Test whether the connection remains usable after a call to
@@ -82,64 +83,100 @@ auto async_cancel_exec(std::shared_ptr<connection> conn) -> net::awaitable<void>
BOOST_TEST(!ec1);
}
BOOST_AUTO_TEST_CASE(cancel_exec_with_timer)
{
net::io_context ioc;
auto const endpoints = resolve();
auto conn = std::make_shared<connection>(ioc);
net::connect(conn->next_layer(), endpoints);
net::co_spawn(ioc.get_executor(), async_run(conn, {}), net::detached);
net::co_spawn(ioc.get_executor(), async_cancel_exec(conn), net::detached);
ioc.run();
}
auto async_ignore_cancel_of_written_req(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto ignore_implicit_cancel_of_req_written() -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
co_await connect(conn, "127.0.0.1", "6379");
// Calls async_run separately from the group of ops below to avoid
// having it canceled when the timer fires.
conn->async_run([conn](auto ec) {
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::operation_aborted);
});
// See NOTE1.
resp3::request req0;
req0.get_config().coalesce = false;
req0.push("HELLO", 3);
std::ignore = co_await conn->async_exec(req0, adapt(), net::use_awaitable);
// Will be cancelled after it has been written but before the
// response arrives.
resp3::request req1;
req1.get_config().coalesce = false;
req1.push("BLPOP", "any", 3);
// Will be cancelled before it is written.
resp3::request req2;
req2.get_config().coalesce = false;
//req2.get_config().cancel_on_connection_lost = true;
req2.push("PING");
net::steady_timer st{ex};
st.expires_after(std::chrono::seconds{1});
net::steady_timer st2{ex};
st2.expires_after(std::chrono::seconds{3});
boost::system::error_code ec1, ec2, ec3;
request req1; // Will be cancelled after it has been written.
req1.get_config().coalesce = false;
req1.push("HELLO", 3);
req1.push("BLPOP", "any", 3);
request req2; // Will be cancelled.
req2.push("PING");
co_await (
conn->async_exec(req1, adapt(), net::redirect_error(net::use_awaitable, ec1)) ||
conn->async_exec(req2, adapt(), net::redirect_error(net::use_awaitable, ec2)) ||
st.async_wait(net::redirect_error(net::use_awaitable, ec3))
conn->async_exec(req1, adapt(), redir(ec1)) ||
conn->async_exec(req2, adapt(), redir(ec2)) ||
st.async_wait(redir(ec3))
);
BOOST_TEST(!ec1);
BOOST_CHECK_EQUAL(ec2, net::error::basic_errors::operation_aborted);
BOOST_TEST(!ec3);
conn->cancel(operation::run);
}
auto cancel_of_req_written_on_run_canceled() -> net::awaitable<void>
{
resp3::request req0;
req0.get_config().coalesce = false;
req0.push("HELLO", 3);
resp3::request req1;
req1.get_config().cancel_on_connection_lost = true;
req1.get_config().retry_on_connection_lost = false;
req1.push("BLPOP", "any", 0);
auto ex = co_await net::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
co_await connect(conn, "127.0.0.1", "6379");
net::steady_timer st{ex};
st.expires_after(std::chrono::seconds{1});
boost::system::error_code ec0, ec1, ec2, ec3;
co_await (
conn->async_exec(req0, adapt(), redir(ec0)) &&
(conn->async_exec(req1, adapt(), redir(ec1)) ||
conn->async_run(redir(ec2)) ||
st.async_wait(redir(ec3)))
);
BOOST_TEST(!ec0);
BOOST_CHECK_EQUAL(ec1, net::error::basic_errors::operation_aborted);
BOOST_CHECK_EQUAL(ec2, net::error::basic_errors::operation_aborted);
BOOST_TEST(!ec3);
}
BOOST_AUTO_TEST_CASE(ignore_cancel_of_written_req)
BOOST_AUTO_TEST_CASE(test_ignore_explicit_cancel_of_req_written)
{
auto const endpoints = resolve();
net::io_context ioc;
auto conn = std::make_shared<connection>(ioc);
net::connect(conn->next_layer(), endpoints);
error_code expected = net::error::operation_aborted;
net::co_spawn(ioc.get_executor(), async_run(conn, expected), net::detached);
net::co_spawn(ioc.get_executor(), async_ignore_cancel_of_written_req(conn), net::detached);
ioc.run();
run(async_ignore_explicit_cancel_of_req_written());
}
BOOST_AUTO_TEST_CASE(test_ignore_implicit_cancel_of_req_written)
{
run(ignore_implicit_cancel_of_req_written());
}
BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled)
{
run(cancel_of_req_written_on_run_canceled());
}
#else
int main(){}
#endif

View File

@@ -49,7 +49,6 @@ BOOST_AUTO_TEST_CASE(push_filtered_out)
});
conn.async_run([](auto ec){
std::cout << "===> " << ec.message() << std::endl;
BOOST_TEST(!ec);
});
@@ -101,7 +100,7 @@ net::awaitable<void> push_consumer1(connection& conn, bool& push_received)
struct adapter_error {
void
operator()(
std::size_t, aedis::resp3::node<boost::string_view> const&, boost::system::error_code& ec)
std::size_t, aedis::resp3::node<std::string_view> const&, boost::system::error_code& ec)
{
ec = aedis::error::incompatible_size;
}

View File

@@ -32,11 +32,14 @@ BOOST_AUTO_TEST_CASE(test_quit_no_coalesce)
connection conn{ioc};
net::connect(conn.next_layer(), endpoints);
request req1{{false, false}};
req1.push("HELLO", 3);
request req1;
req1.get_config().cancel_on_connection_lost = false;
req1.get_config().coalesce = false;
req1.push("PING");
request req2{{false, false}};
request req2;
req2.get_config().cancel_on_connection_lost = false;
req2.get_config().coalesce = false;
req2.push("QUIT");
conn.async_exec(req1, adapt(), [](auto ec, auto){

View File

@@ -31,7 +31,6 @@ BOOST_AUTO_TEST_CASE(test_quit_coalesce)
net::connect(conn.next_layer(), endpoints);
request req1{{false, true}};
req1.push("HELLO", 3);
req1.push("PING");
request req2{{false, true}};

View File

@@ -14,12 +14,12 @@
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "common.hpp"
#include "../examples/common/common.hpp"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::resp3::request;
using aedis::adapt;
using connection = aedis::connection;
using error_code = boost::system::error_code;
#include <boost/asio/experimental/awaitable_operators.hpp>
@@ -29,7 +29,7 @@ net::awaitable<void> test_reconnect_impl()
{
auto ex = co_await net::this_coro::executor;
request req;
resp3::request req;
req.push("QUIT");
auto const endpoints = resolve();
@@ -68,41 +68,42 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
net::steady_timer st{ex};
auto conn = std::make_shared<connection>(ex);
auto const endpoints = resolve();
boost::system::error_code ec1, ec2, ec3;
request req1;
resp3::request req1;
req1.get_config().cancel_if_not_connected = false;
req1.get_config().cancel_on_connection_lost = true;
req1.get_config().retry_on_connection_lost = false;
req1.push("HELLO", 3);
req1.push("CLIENT", "PAUSE", 7000);
req1.push("BLPOP", "any", 0);
net::connect(conn->next_layer(), endpoints);
co_await connect(conn, "127.0.0.1", "6379");
st.expires_after(std::chrono::seconds{1});
co_await (
conn->async_exec(req1, adapt(), net::redirect_error(net::use_awaitable, ec1)) ||
conn->async_run(net::redirect_error(net::use_awaitable, ec2)) ||
st.async_wait(net::redirect_error(net::use_awaitable, ec3))
conn->async_exec(req1, adapt(), redir(ec1)) ||
conn->async_run(redir(ec2)) ||
st.async_wait(redir(ec3))
);
BOOST_TEST(!ec1);
//BOOST_TEST(!ec1);
BOOST_CHECK_EQUAL(ec2, boost::system::errc::errc_t::operation_canceled);
//BOOST_TEST(!ec3);
request req2;
resp3::request req2;
req2.get_config().cancel_if_not_connected = false;
req2.get_config().cancel_on_connection_lost = true;
req2.get_config().retry_on_connection_lost= false;
req2.push("HELLO", 3);
req2.push("QUIT");
net::connect(conn->next_layer(), endpoints);
co_await connect(conn, "127.0.0.1", "6379");
st.expires_after(std::chrono::seconds{1});
co_await (
conn->async_exec(req1, adapt(), net::redirect_error(net::use_awaitable, ec1)) ||
conn->async_run(net::redirect_error(net::use_awaitable, ec2)) ||
st.async_wait(net::redirect_error(net::use_awaitable, ec3))
);
std::cout << "ccc" << std::endl;
BOOST_CHECK_EQUAL(ec1, boost::system::errc::errc_t::operation_canceled);
BOOST_CHECK_EQUAL(ec2, boost::asio::error::basic_errors::operation_aborted);
@@ -110,9 +111,7 @@ auto async_test_reconnect_timeout() -> net::awaitable<void>
BOOST_AUTO_TEST_CASE(test_reconnect_and_idle)
{
net::io_context ioc;
net::co_spawn(ioc, async_test_reconnect_timeout(), net::detached);
ioc.run();
run(async_test_reconnect_timeout());
}
#else
int main(){}

View File

@@ -40,10 +40,7 @@ auto async_cancel_run_with_timer() -> net::awaitable<void>
st.expires_after(std::chrono::seconds{1});
boost::system::error_code ec1, ec2;
co_await (
conn.async_run(net::redirect_error(net::use_awaitable, ec1)) ||
st.async_wait(net::redirect_error(net::use_awaitable, ec2))
);
co_await (conn.async_run(redir(ec1)) || st.async_wait(redir(ec2)));
BOOST_CHECK_EQUAL(ec1, boost::asio::error::basic_errors::operation_aborted);
BOOST_TEST(!ec2);
@@ -69,10 +66,7 @@ async_check_cancellation_not_missed(int n, std::chrono::milliseconds ms) -> net:
timer.expires_after(ms);
net::connect(conn.next_layer(), endpoints);
boost::system::error_code ec1, ec2;
co_await (
conn.async_run(net::redirect_error(net::use_awaitable, ec1)) ||
timer.async_wait(net::redirect_error(net::use_awaitable, ec2))
);
co_await (conn.async_run(redir(ec1)) || timer.async_wait(redir(ec2)));
BOOST_CHECK_EQUAL(ec1, boost::asio::error::basic_errors::operation_aborted);
std::cout << "Counter: " << i << std::endl;
}
@@ -174,6 +168,31 @@ BOOST_AUTO_TEST_CASE(reset_before_run_completes)
ioc.run();
}
using slave_operation = aedis::detail::guarded_operation<>;
auto master(std::shared_ptr<slave_operation> op) -> net::awaitable<void>
{
co_await op->async_run(net::use_awaitable);
}
auto slave(std::shared_ptr<slave_operation> op) -> net::awaitable<void>
{
net::steady_timer timer{co_await net::this_coro::executor};
timer.expires_after(std::chrono::seconds{1});
co_await op->async_wait(timer.async_wait(net::deferred), net::use_awaitable);
std::cout << "Kabuf" << std::endl;
}
BOOST_AUTO_TEST_CASE(slave_op)
{
net::io_context ioc;
auto op = std::make_shared<slave_operation>(ioc.get_executor());
net::co_spawn(ioc, master(op), net::detached);
net::co_spawn(ioc, slave(op), net::detached);
ioc.run();
}
#else
int main(){}
#endif

View File

@@ -24,7 +24,6 @@
// TODO: Test with empty strings.
namespace std
{
auto operator==(aedis::ignore, aedis::ignore) noexcept {return true;}
@@ -893,10 +892,22 @@ BOOST_AUTO_TEST_CASE(type_convert)
BOOST_AUTO_TEST_CASE(adapter)
{
using aedis::adapt;
using resp3::type;
boost::system::error_code ec;
std::string a;
int b;
auto resp = std::tie(a, b, std::ignore);
std::string s;
auto resp = std::tie(s, std::ignore);
auto f = adapt(resp);
(void)f;
f(0, resp3::node<std::string_view>{type::simple_string, 1, 0, "Hello"}, ec);
f(1, resp3::node<std::string_view>{type::number, 1, 0, "42"}, ec);
BOOST_CHECK_EQUAL(a, "Hello");
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(b, 42);
BOOST_TEST(!ec);
}

View File

@@ -10,7 +10,7 @@
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
#include <aedis.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/src.hpp>
using aedis::resp3::request;