2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-24 06:22:07 +00:00

Compare commits

...

33 Commits

Author SHA1 Message Date
Marcelo Zimbres
d26ecb65ca Improvements in the docs. 2022-08-07 11:32:50 +02:00
Marcelo Zimbres
c57f97b8c1 Improvements in the examples. 2022-08-06 23:12:32 +02:00
Marcelo Zimbres
37ab1e7387 Support for reconnection. 2022-08-06 18:11:12 +02:00
Marcelo Zimbres
54d448cad4 Progresses with connection events. 2022-08-06 13:06:05 +02:00
Marcelo Zimbres
97428dedb3 Progresses with reconnection. 2022-08-04 23:58:04 +02:00
Marcelo Zimbres
83802f217a Prepares for events. 2022-08-03 21:40:43 +02:00
Marcelo Zimbres
08140f9186 Fixes async_exec function. 2022-08-02 21:52:34 +02:00
Marcelo Zimbres
3ddb017edb Adds automatic AUTH and HELLO. 2022-08-01 22:46:34 +02:00
Marcelo Zimbres
20328cd423 Don't cancel the push channel when async_run exits. 2022-08-01 21:50:38 +02:00
Marcelo Zimbres
6577ddbaab First steps. 2022-07-31 22:10:49 +02:00
Marcelo Zimbres
217d2bd87b Progresses with the synchronous exec functions. 2022-07-31 11:50:23 +02:00
Marcelo Zimbres
f96dd22153 Improves executor usage in sync wrapper. 2022-07-30 23:35:28 +02:00
Marcelo Zimbres
f1fd0cfa8c Removes warnings on g++. 2022-07-30 23:18:24 +02:00
Marcelo Zimbres
8728914109 Adds changelog, fixes CI file, improvements in the docs. 2022-07-30 09:31:11 +02:00
Marcelo Zimbres
e0041ac7ae Progresses with async_failover function. 2022-07-28 22:00:42 +02:00
Marcelo Zimbres
317a185eb0 Adds in the sync_wrapper class. 2022-07-27 22:18:00 +02:00
Marcelo Zimbres
aa81200a8f Adds assert to check the response tuple is compatible with the request size. 2022-07-26 22:10:31 +02:00
Marcelo Zimbres
55fc0e861c Fixes bug on reconnection. 2022-07-26 21:47:14 +02:00
Marcelo Zimbres
04271855b0 Adds example on how to use Aedis synchronously. 2022-07-25 22:53:03 +02:00
Marcelo Zimbres
700e0c823e First steps with the CI file. 2022-07-24 21:21:02 +02:00
Marcelo Zimbres
63c6465a4a Improvements in the docs and subscriber example with reconnection. 2022-07-24 16:33:13 +02:00
Marcelo Zimbres
c86422cf50 Moves files to include directory. 2022-07-24 00:03:19 +02:00
Marcelo Zimbres
0168ed5faf Fixes build for clang++-14,13,11. 2022-07-23 14:55:01 +02:00
Marcelo Zimbres
7bffa252f4 Improvements in the documentation. 2022-07-21 22:05:26 +02:00
Marcelo Zimbres
0bb65599c4 Simplifies the char_room example. 2022-07-21 21:35:52 +02:00
Marcelo Zimbres
edd538944f Uses the correct executor in the exec timer. 2022-07-19 22:01:09 +02:00
Marcelo Zimbres
42880e788b Simplifies aedis header. 2022-07-17 18:42:24 +02:00
Marcelo Zimbres
bcc3917174 Test improvements. 2022-07-17 10:47:12 +02:00
Marcelo Zimbres
b08dd63192 Updates benchmark doc. 2022-07-16 21:25:57 +02:00
Marcelo Zimbres
76b6106caa Fixes executor usage in connection class. 2022-07-16 21:21:13 +02:00
Marcelo Zimbres
ab68e8a31d Updates to a more recent Tokio version and uses single thread. 2022-07-16 20:00:36 +02:00
Marcelo Zimbres
2673557ce5 More corrections. 2022-07-16 14:30:16 +02:00
Marcelo Zimbres
2a302dcb65 Corrections to the benchmark document. 2022-07-16 14:25:38 +02:00
54 changed files with 2080 additions and 1238 deletions

43
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,43 @@
name: CI
on:
push:
branches: [ master ]
jobs:
build:
strategy:
fail-fast: false
matrix:
build-type: ['sanity']
runs-on: [ubuntu-22.04]
compiler: [g++-11, clang++-11, clang++-13]
cxx-std: ['c++20']
optim-level: ['-O0']
runs-on: ${{ matrix.runs-on }}
env:
CXX: ${{ matrix.compiler }}
CXXFLAGS: -std=${{ matrix.cxx-std }} ${{ matrix.optim-level }} -Wall -Wextra
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Autotools
run: sudo apt install automake
- name: Install compiler
run: sudo apt-get install -y ${{ matrix.compiler }}
- name: Install Redis
run: sudo apt-get install -y redis-server
- name: Install boost
uses: MarkusJx/install-boost@v2.3.0
id: install-boost
with:
boost_version: 1.79.0
platform_version: 22.04
- name: Configure
run: |
autoreconf -i
./configure --with-boost=${{ steps.install-boost.outputs.BOOST_ROOT }}
- name: Build
run: make
- name: Check
run: make check VERBOSE=1

76
CHANGELOG.md Normal file
View File

@@ -0,0 +1,76 @@
# Changelog
## v0.3.0
* Adds `experimental::exec` and `receive_event` functions to offer a
thread safe and synchronous way of executing requests across
threads. See `intro_sync.cpp` and `subscriber_sync.cpp` for
examples.
* `connection::async_read_push` was renamed to `async_receive_event`.
* `connection::async_receive_event` is now being used to communicate
internal events to the user, such as resolve, connect, push etc. For
examples see subscriber.cpp and `connection::event`.
* The `aedis` directory has been moved to `include` to look more
similar to Boost libraries. Users should now replace `-I/aedis-path`
with `-I/aedis-path/include` in the compiler flags.
* The `AUTH` and `HELLO` commands are now sent automatically. This change was
necessary to implement reconnection. The username and password
used in `AUTH` should be provided by the user on
`connection::config`.
* Adds support for reconnection. See `connection::enable_reconnect`.
* Fixes a bug in the `connection::async_run(host, port)` overload
that was causing crashes on reconnection.
* Fixes the executor usage in the connection class. Before theses
changes it was imposing `any_io_executor` on users.
* `connection::async_receiver_event` is not cancelled anymore when
`connection::async_run` exits. This change makes user code simpler.
* `connection::async_exec` with host and port overload has been
removed. Use the other `connection::async_run` overload.
* The host and port parameters from `connection::async_run` have been
move to `connection::config` to better support authentication and
failover.
* Many simplifications in the `chat_room` example.
* Fixes build in clang the compilers and makes some improvements in
the documentation.
## v0.2.1
* Fixes a bug that happens on very high load.
## v0.2.0
* Major rewrite of the high-level API. There is no more need to use the low-level API anymore.
* No more callbacks: Sending requests follows the ASIO asynchrnous model.
* Support for reconnection: Pending requests are not canceled when a connection is lost and are re-sent when a new one is established.
* The library is not sending HELLO-3 on user behalf anymore. This is important to support AUTH properly.
## v0.1.2
* Adds reconnect coroutine in the `echo_server` example.
* Corrects `client::async_wait_for_data` with `make_parallel_group` to launch operation.
* Improvements in the documentation.
* Avoids dynamic memory allocation in the client class after reconnection.
## v0.1.1
* Improves the documentation and adds some features to the high-level client.
## v0.1.0
* Improvements in the design and documentation.
## v0.0.1
* First release to collect design feedback.

2
CMakeLists.txt Normal file
View File

@@ -0,0 +1,2 @@
# This is ongoing work. At the moment autotools is still the supported
# build system.

View File

@@ -6,24 +6,26 @@ DISTCHECK_CONFIGURE_FLAGS = CPPFLAGS="$(BOOST_CPPFLAGS) $(CPPFLAGS)" LDFLAGS="$(
AM_CPPFLAGS =
AM_CPPFLAGS += $(BOOST_CPPFLAGS)
#AM_CPPFLAGS += -I$(top_srcdir)/include
AM_CPPFLAGS += -I$(top_srcdir)/include
AM_LDFLAGS =
AM_LDFLAGS += -pthread
SUBDIRS = include
check_PROGRAMS =
check_PROGRAMS += intro_sync
check_PROGRAMS += low_level_sync
check_PROGRAMS += intro
check_PROGRAMS += intro_sync
check_PROGRAMS += containers
check_PROGRAMS += serialization
check_PROGRAMS += test_low_level
if HAVE_CXX20
check_PROGRAMS += test_high_level
endif
check_PROGRAMS += test_connection
EXTRA_PROGRAMS =
if HAVE_COROUTINES
EXTRA_PROGRAMS += subscriber
if HAVE_CXX20
EXTRA_PROGRAMS += subscriber_sync
EXTRA_PROGRAMS += echo_server
EXTRA_PROGRAMS += echo_server_direct
EXTRA_PROGRAMS += chat_room
@@ -36,45 +38,22 @@ CLEANFILES += $(EXTRA_PROGRAMS)
.PHONY: all
all: $(check_PROGRAMS) $(EXTRA_PROGRAMS)
intro_sync_SOURCES = $(top_srcdir)/tests/intro_sync.cpp
subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp
low_level_sync_SOURCES = $(top_srcdir)/tests/low_level_sync.cpp
test_low_level_SOURCES = $(top_srcdir)/tests/low_level.cpp
intro_SOURCES = $(top_srcdir)/examples/intro.cpp
intro_sync_SOURCES = $(top_srcdir)/examples/intro_sync.cpp
containers_SOURCES = $(top_srcdir)/examples/containers.cpp
serialization_SOURCES = $(top_srcdir)/examples/serialization.cpp
if HAVE_CXX20
test_high_level_SOURCES = $(top_srcdir)/tests/high_level.cpp
test_connection_SOURCES = $(top_srcdir)/tests/connection.cpp
subscriber_sync_SOURCES = $(top_srcdir)/examples/subscriber_sync.cpp
if HAVE_COROUTINES
subscriber_SOURCES = $(top_srcdir)/examples/subscriber.cpp
chat_room_SOURCES = $(top_srcdir)/examples/chat_room.cpp
echo_server_SOURCES = $(top_srcdir)/examples/echo_server.cpp
echo_server_direct_SOURCES = $(top_srcdir)/benchmarks/cpp/asio/echo_server_direct.cpp
echo_server_client_SOURCES = $(top_srcdir)/benchmarks/cpp/asio/echo_server_client.cpp
endif
nobase_include_HEADERS =\
$(top_srcdir)/aedis/src.hpp\
$(top_srcdir)/aedis/error.hpp\
$(top_srcdir)/aedis/impl/error.ipp\
$(top_srcdir)/aedis/detail/net.hpp\
$(top_srcdir)/aedis/command.hpp\
$(top_srcdir)/aedis/impl/command.ipp\
$(top_srcdir)/aedis/connection.hpp\
$(top_srcdir)/aedis/adapt.hpp\
$(top_srcdir)/aedis/detail/connection_ops.hpp\
$(top_srcdir)/aedis/aedis.hpp\
$(top_srcdir)/aedis/adapter/detail/adapters.hpp\
$(top_srcdir)/aedis/adapter/adapt.hpp\
$(top_srcdir)/aedis/adapter/detail/response_traits.hpp\
$(top_srcdir)/aedis/resp3/node.hpp\
$(top_srcdir)/aedis/resp3/compose.hpp\
$(top_srcdir)/aedis/resp3/detail/read_ops.hpp\
$(top_srcdir)/aedis/resp3/detail/parser.hpp\
$(top_srcdir)/aedis/resp3/type.hpp\
$(top_srcdir)/aedis/resp3/read.hpp\
$(top_srcdir)/aedis/resp3/write.hpp\
$(top_srcdir)/aedis/resp3/request.hpp\
$(top_srcdir)/aedis/resp3/detail/impl/parser.ipp\
$(top_srcdir)/aedis/resp3/impl/type.ipp
nobase_noinst_HEADERS =\
$(top_srcdir)/examples/print.hpp\
$(top_srcdir)/tests/check.hpp

View File

@@ -1 +1,16 @@
See https://mzimbres.github.io/aedis/
An async redis client designed for performance and scalability
### License
Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt).
### Build Status
Branch | GH Actions |
:-------------: | ---------- |
[`master`](https://github.com/mzimbres/aedis/tree/master) | [![CI](https://github.com/mzimbres/aedis/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/mzimbres/aedis/actions/workflows/ci.yml)
### More information
* See the official github-pages for documentation: https://mzimbres.github.io/aedis

View File

@@ -1,18 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_COMMAND_HPP
#define AEDIS_COMMAND_HPP
#include <boost/utility/string_view.hpp>
namespace aedis {
bool has_push_response(boost::string_view cmd);
} // aedis
#endif // AEDIS_COMMAND_HPP

View File

@@ -1,14 +1,12 @@
# TCP echo server performance.
# TCP echo server performance
This document describe benchmarks the performance of TCP echo servers
I implemented in different languages using different Redis clients.
The main motivations for choosing a TCP echo server as a benchmark
program are
This document benchmarks the performance of TCP echo servers I
implemented in different languages using different Redis clients. The
main motivations for choosing an echo server are
* Simple to implement and does not require expertise level in most languages.
* I/O bound: Echo servers have very low CPU consumption in general
and therefore are excelent as a measure of the ability of a
program to server concurrent requests.
and therefore are excelent to measure how a program handles concurrent requests.
* It simulates very well a typical backend in regard to concurrency.
I also imposed some constraints on the implementations
@@ -25,14 +23,14 @@ be seen below
![](https://mzimbres.github.io/aedis/tcp-echo-direct.png)
The tests were performed with a 1000 concurrent TCP connection on the
The tests were performed with a 1000 concurrent TCP connections on the
localhost where latency is 0.07ms on average on my machine. On higher
latency networks the difference among libraries is expected to
decrease.
### Remarks:
### Remarks
* I was not expecting Asio to perform so much better than Tokio and libuv.
* I expected Libuv to have similar performance to Asio and Tokio.
* I did expect nodejs to come a little behind given it is is
javascript code. Otherwise I did expect it to have similar
performance to libuv since it is the framework behind it.
@@ -50,14 +48,14 @@ The code used in the benchmarks can be found at
This is similar to the echo server described above but messages are
echoed by Redis and not by the echo-server itself, which acts
as a proxy between the client and the Redis server. The result
as a proxy between the client and the Redis server. The results
can be seen below
![](https://mzimbres.github.io/aedis/tcp-echo-over-redis.png)
The tests were performed on a network where latency is 35ms on
average, otherwise it is equal to the benchmarks above regarding the
number of TCP connection. The result can be seen below
average, otherwise it uses the same number of TCP connections
as the previous example.
### Remarks
@@ -66,8 +64,8 @@ in the graph, the reasons are
* [redis-rs](https://github.com/redis-rs/redis-rs): This client
comes so far behind that it can't even be represented together
with the other benchmarks without making them look insignificant. I
don't know for sure why it is so slow, I suppose it has
with the other benchmarks without making them look insignificant.
I don't know for sure why it is so slow, I suppose it has
something to do with its lack of proper
[pipelining](https://redis.io/docs/manual/pipelining/) support.
In fact, the more TCP connections I lauch the worst its
@@ -84,6 +82,10 @@ The code used in the benchmarks can be found at
* [node-redis](https://github.com/redis/node-redis): [code](https://github.com/mzimbres/aedis/tree/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/nodejs/echo_server_over_redis)
* [go-redis](https://github.com/go-redis/redis): [code](https://github.com/mzimbres/aedis/blob/3fb018ccc6138d310ac8b73540391cdd8f2fdad6/benchmarks/go/echo_server_over_redis.go)
## Running the benchmarks
Run one of the echo-server programs in one terminal and the [echo-server-client](https://github.com/mzimbres/aedis/blob/42880e788bec6020dd018194075a211ad9f339e8/benchmarks/cpp/asio/echo_server_client.cpp) in another.
## Contributing
If your spot any performance improvement in any of the example or

View File

@@ -25,7 +25,7 @@
]
\addplot coordinates {
(31.1,Asio)
(43.2,Tokio)
(30.7,Tokio)
(43.6,Libuv)
(74.2,Nodejs)
(81.0,Go)

View File

@@ -8,23 +8,22 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <cstdio>
#include <boost/asio.hpp>
namespace net = boost::asio;
namespace this_coro = net::this_coro;
using net::ip::tcp;
using net::awaitable;
using net::co_spawn;
using net::detached;
using net::use_awaitable;
using executor_type = net::io_context::executor_type;
using socket_type = net::basic_stream_socket<net::ip::tcp, executor_type>;
using tcp_socket = net::use_awaitable_t<executor_type>::as_default_on_t<socket_type>;
using acceptor_type = net::basic_socket_acceptor<net::ip::tcp, executor_type>;
using tcp_acceptor = net::use_awaitable_t<executor_type>::as_default_on_t<acceptor_type>;
using awaitable_type = net::awaitable<void, executor_type>;
constexpr net::use_awaitable_t<executor_type> use_awaitable;
awaitable<void> echo(tcp::socket socket)
awaitable_type echo(tcp_socket socket)
{
try {
char data[1024];
@@ -37,20 +36,20 @@ awaitable<void> echo(tcp::socket socket)
}
}
awaitable<void> listener()
awaitable_type listener()
{
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), 55555});
auto ex = co_await this_coro::executor;
tcp_acceptor acceptor(ex, {tcp::v4(), 55555});
for (;;) {
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, echo(std::move(socket)), detached);
tcp_socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(ex, echo(std::move(socket)), detached);
}
}
int main()
{
try {
net::io_context io_context(1);
net::io_context io_context{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO};
co_spawn(io_context, listener(), detached);
io_context.run();
} catch (std::exception const& e) {

View File

@@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -10,15 +16,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bytes"
version = "0.5.6"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cfg-if"
@@ -33,34 +33,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags",
"fuchsia-zircon-sys",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures-core"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -70,44 +42,29 @@ dependencies = [
"libc",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836"
[[package]]
name = "lock_api"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
@@ -118,76 +75,14 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "mio"
version = "0.6.23"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4"
checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
dependencies = [
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
"kernel32-sys",
"libc",
"log",
"miow 0.2.2",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio-named-pipes"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656"
dependencies = [
"log",
"mio",
"miow 0.3.7",
"winapi 0.3.9",
]
[[package]]
name = "mio-uds"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio",
]
[[package]]
name = "miow"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]]
name = "miow"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "net2"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
"wasi",
"windows-sys",
]
[[package]]
@@ -201,10 +96,39 @@ dependencies = [
]
[[package]]
name = "pin-project-lite"
version = "0.1.12"
name = "once_cell"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777"
checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "proc-macro2"
@@ -224,6 +148,21 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42"
dependencies = [
"bitflags",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@@ -234,10 +173,20 @@ dependencies = [
]
[[package]]
name = "slab"
version = "0.4.6"
name = "smallvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
[[package]]
name = "socket2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "syn"
@@ -252,33 +201,30 @@ dependencies = [
[[package]]
name = "tokio"
version = "0.2.25"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092"
checksum = "57aec3cfa4c296db7255446efb4928a6be304b431a806216105542a67b6ca82e"
dependencies = [
"autocfg",
"bytes",
"fnv",
"futures-core",
"iovec",
"lazy_static",
"libc",
"memchr",
"mio",
"mio-named-pipes",
"mio-uds",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2",
"tokio-macros",
"winapi 0.3.9",
"winapi",
]
[[package]]
name = "tokio-macros"
version = "0.2.6"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a"
checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
dependencies = [
"proc-macro2",
"quote",
@@ -292,10 +238,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c"
[[package]]
name = "winapi"
version = "0.2.8"
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
@@ -307,12 +253,6 @@ dependencies = [
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
@@ -326,11 +266,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
name = "windows-sys"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"winapi 0.2.8",
"winapi-build",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"

View File

@@ -6,7 +6,4 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "0.2", features = ["full"] }
[build]
target-dir = "/home/marcelo/local/rust/target"
tokio = { version = "1.0", features = ["full"] }

View File

@@ -1,9 +1,10 @@
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut listener = TcpListener::bind("127.0.0.1:55555").await?;
let listener = TcpListener::bind("127.0.0.1:55555").await?;
loop {
let (mut socket, _) = listener.accept().await?;

View File

@@ -1,9 +1,10 @@
AC_PREREQ([2.69])
AC_INIT([Aedis], [0.2.1], [mzimbres@gmail.com])
AC_INIT([Aedis], [0.3.0], [mzimbres@gmail.com])
AC_CONFIG_MACRO_DIR([m4])
#AC_CONFIG_SRCDIR([src/aedis.cpp])
AC_CONFIG_HEADERS([config.h])
AC_CONFIG_SRCDIR(include/aedis.hpp)
AM_INIT_AUTOMAKE([-Wall foreign])
AC_LANG(C++)
# Checks for programs.
AC_PROG_CXX
@@ -18,10 +19,36 @@ AC_CHECK_HEADER_STDBOOL
AC_TYPE_UINT64_T
AC_CHECK_TYPES([ptrdiff_t])
AX_CXX_COMPILE_STDCXX(14, , mandatory)
AX_CXX_COMPILE_STDCXX(17, , mandatory)
AX_CXX_COMPILE_STDCXX(20, , optional)
AM_CONDITIONAL(HAVE_CXX20,[test x$HAVE_CXX20 == x1])
AC_CONFIG_FILES([Makefile doc/Doxyfile])
# This check has been stolen from Asio
AC_MSG_CHECKING([whether coroutines are enabled])
AC_COMPILE_IFELSE(
[AC_LANG_PROGRAM(
[[#if defined(__clang__)]]
[[# if (__cplusplus >= 201703) && (__cpp_coroutines >= 201703)]]
[[# if __has_include(<experimental/coroutine>)]]
[[# define AEDIS_HAS_CO_AWAIT 1]]
[[# endif]]
[[# endif]]
[[#elif defined(__GNUC__)]]
[[# if (__cplusplus >= 201709) && (__cpp_impl_coroutine >= 201902)]]
[[# if __has_include(<coroutine>)]]
[[# define AEDIS_HAS_CO_AWAIT 1]]
[[# endif]]
[[# endif]]
[[#endif]]
[[#ifndef AEDIS_HAS_CO_AWAIT]]
[[# error coroutines not available]]
[[#endif]])],
[AC_MSG_RESULT([yes])
HAVE_COROUTINES=yes;],
[AC_MSG_RESULT([no])
HAVE_COROUTINES=no;])
AM_CONDITIONAL(HAVE_COROUTINES,test x$HAVE_COROUTINES = xyes)
AC_CONFIG_FILES([Makefile include/Makefile doc/Doxyfile])
AC_OUTPUT

View File

@@ -44,7 +44,7 @@ PROJECT_NUMBER = "@PACKAGE_VERSION@"
# for a project that appears at the top of each page and should give viewer a
# quick idea about the purpose of the project. Keep the description short.
PROJECT_BRIEF = "High level Redis client library"
PROJECT_BRIEF = "High level Redis client"
# With the PROJECT_LOGO tag one can specify a logo or an icon that is included
# in the documentation. The maximum height of the logo should not exceed 55
@@ -823,7 +823,7 @@ WARN_LOGFILE =
# spaces. See also FILE_PATTERNS and EXTENSION_MAPPING
# Note: If this tag is empty the current directory is searched.
INPUT = aedis examples
INPUT = include benchmarks/benchmarks.md CHANGELOG.md examples
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

View File

@@ -26,5 +26,5 @@ div.contents {
code
{
background-color:#f0e9ce;
background-color:#fffbeb;
}

View File

@@ -4,156 +4,91 @@
* accompanying file LICENSE.txt)
*/
#include <queue>
#include <vector>
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
#include <aedis.hpp>
#include "unistd.h"
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using connection = aedis::connection<tcp_socket>;
using response_type = std::vector<aedis::resp3::node<std::string>>;
class user_session:
public std::enable_shared_from_this<user_session> {
public:
user_session(tcp_socket socket)
: socket_(std::move(socket))
, timer_(socket_.get_executor())
{ timer_.expires_at(std::chrono::steady_clock::time_point::max()); }
// Chat over redis pubsub. To test, run this program from different
// terminals and type messages to stdin. You may also want to run
//
// $ redis-cli
// > monitor
//
// To see the message traffic.
void start(std::shared_ptr<connection> db)
{
co_spawn(socket_.get_executor(),
[self = shared_from_this(), db]{ return self->reader(db); },
net::detached);
co_spawn(socket_.get_executor(),
[self = shared_from_this()]{ return self->writer(); },
net::detached);
}
void deliver(std::string const& msg)
{
write_msgs_.push_back(msg);
timer_.cancel_one();
}
private:
net::awaitable<void> reader(std::shared_ptr<connection> db)
{
try {
std::string msg;
request req;
auto dbuffer = net::dynamic_buffer(msg, 1024);
for (;;) {
auto const n = co_await net::async_read_until(socket_, dbuffer, "\n");
req.push("PUBLISH", "channel", msg);
co_await db->async_exec(req);
req.clear();
msg.erase(0, n);
}
} catch (std::exception&) {
stop();
}
}
net::awaitable<void> writer()
{
try {
while (socket_.is_open()) {
if (write_msgs_.empty()) {
boost::system::error_code ec;
co_await timer_.async_wait(net::redirect_error(net::use_awaitable, ec));
} else {
co_await net::async_write(socket_, net::buffer(write_msgs_.front()));
write_msgs_.pop_front();
}
}
} catch (std::exception&) {
stop();
}
}
void stop()
{
socket_.close();
timer_.cancel();
}
tcp_socket socket_;
net::steady_timer timer_;
std::deque<std::string> write_msgs_;
};
using sessions_type = std::vector<std::shared_ptr<user_session>>;
net::awaitable<void>
reader(
std::shared_ptr<connection> db,
std::shared_ptr<sessions_type> sessions)
net::awaitable<void> subscriber(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "channel");
co_await db->async_exec(req);
req.push("SUBSCRIBE", "chat-channel");
for (response_type resp;;) {
co_await db->async_read_push(adapt(resp));
auto const ev = co_await db->async_receive_event(aedis::adapt(resp));
switch (ev) {
case connection::event::push:
print_push(resp);
break;
for (auto& session: *sessions)
session->deliver(resp.at(3).value);
case connection::event::hello:
co_await db->async_exec(req);
break;
default:;
}
resp.clear();
}
}
net::awaitable<void>
listener(
std::shared_ptr<tcp_acceptor> acc,
std::shared_ptr<connection> db,
std::shared_ptr<sessions_type> sessions)
publisher(net::posix::stream_descriptor& in, std::shared_ptr<connection> db)
{
request req;
req.push("HELLO", 3);
co_await db->async_exec(req);
for (;;) {
auto socket = co_await acc->async_accept();
auto session = std::make_shared<user_session>(std::move(socket));
sessions->push_back(session);
session->start(db);
for (std::string msg;;) {
std::size_t n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n", net::use_awaitable);
request req;
req.push("PUBLISH", "chat-channel", msg);
co_await db->async_exec(req);
msg.erase(0, n);
}
}
auto handler =[](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
try {
net::io_context ioc{1};
net::posix::stream_descriptor in{ioc, ::dup(STDIN_FILENO)};
// Redis client and receiver.
auto db = std::make_shared<connection>(ioc);
db->async_run("127.0.0.1", "6379", handler);
db->get_config().enable_events = true;
auto sessions = std::make_shared<sessions_type>();
net::co_spawn(ioc, reader(db, sessions), net::detached);
co_spawn(ioc, publisher(in, db), net::detached);
co_spawn(ioc, subscriber(db), net::detached);
db->async_run([](auto ec) {
std::cout << ec.message() << std::endl;
});
// TCP acceptor.
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<tcp_acceptor>(ioc, endpoint);
co_spawn(ioc, listener(acc, db, sessions), net::detached);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}
#else // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
int main() {}
#endif // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)

View File

@@ -7,20 +7,18 @@
#include <map>
#include <vector>
#include <iostream>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include <aedis.hpp>
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using boost::optional;
using aedis::adapt;
using aedis::resp3::request;
using connection = aedis::connection<>;
// $ redis-cli
// > ACL SETUSER mzimbres on >Jabuticaba ~* +@all
// OK
int main()
{
std::vector<int> vec
@@ -30,8 +28,6 @@ int main()
{{"key1", 10}, {"key2", 20}, {"key3", 30}};
request req;
req.push("AUTH", "mzimbres", "Jabuticaba");
req.push("HELLO", 3);
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
req.push("MULTI");
@@ -41,8 +37,6 @@ int main()
req.push("QUIT");
std::tuple<
aedis::ignore, // auth
aedis::ignore, // hello
aedis::ignore, // rpush
aedis::ignore, // hset
aedis::ignore, // multi
@@ -54,10 +48,11 @@ int main()
net::io_context ioc;
connection db{ioc};
db.async_exec("127.0.0.1", "6379", req, aedis::adapt(resp),
[](auto ec, auto) { std::cout << ec.message() << std::endl; });
db.async_run(req, aedis::adapt(resp), [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
ioc.run();
print(std::get<0>(std::get<7>(resp)).value());
print(std::get<1>(std::get<7>(resp)).value());
print(std::get<0>(std::get<5>(resp)).value());
print(std::get<1>(std::get<5>(resp)).value());
}

View File

@@ -4,23 +4,26 @@
* accompanying file LICENSE.txt)
*/
//#define BOOST_ASIO_HAS_IO_URING
//#define BOOST_ASIO_DISABLE_EPOLL
#include <string>
#include <iostream>
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
#include <aedis.hpp>
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using executor_type = net::io_context::executor_type;
using socket_type = net::basic_stream_socket<net::ip::tcp, executor_type>;
using tcp_socket = net::use_awaitable_t<executor_type>::as_default_on_t<socket_type>;
using acceptor_type = net::basic_socket_acceptor<net::ip::tcp, executor_type>;
using tcp_acceptor = net::use_awaitable_t<executor_type>::as_default_on_t<acceptor_type>;
using awaitable_type = net::awaitable<void, executor_type>;
using connection = aedis::connection<tcp_socket>;
net::awaitable<void> echo_loop(tcp_socket socket, std::shared_ptr<connection> db)
awaitable_type echo_loop(tcp_socket socket, std::shared_ptr<connection> db)
{
try {
request req;
@@ -41,15 +44,11 @@ net::awaitable<void> echo_loop(tcp_socket socket, std::shared_ptr<connection> db
}
}
net::awaitable<void> listener()
awaitable_type listener()
{
auto ex = co_await net::this_coro::executor;
auto db = std::make_shared<connection>(ex);
db->async_run("127.0.0.1", "6379", net::detached);
request req;
req.push("HELLO", 3);
co_await db->async_exec(req);
db->async_run(net::detached);
tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555});
for (;;)
@@ -59,7 +58,7 @@ net::awaitable<void> listener()
int main()
{
try {
net::io_context ioc;
net::io_context ioc{BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO};
co_spawn(ioc, listener(), net::detached);
ioc.run();
} catch (std::exception const& e) {

View File

@@ -7,7 +7,9 @@
#include <tuple>
#include <string>
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
#include <aedis.hpp>
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
@@ -18,20 +20,19 @@ using connection = aedis::connection<>;
int main()
{
net::io_context ioc;
connection db{ioc};
request req;
req.push("HELLO", 3);
req.push("PING");
req.push("QUIT");
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
net::io_context ioc;
connection db{ioc};
db.async_exec("127.0.0.1", "6379", req, adapt(resp),
[](auto ec, auto) { std::cout << ec.message() << std::endl; });
std::tuple<std::string, aedis::ignore> resp;
db.async_run(req, adapt(resp), [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
ioc.run();
std::cout << std::get<1>(resp) << std::endl;
std::cout << std::get<0>(resp) << std::endl;
}

45
examples/intro_sync.cpp Normal file
View File

@@ -0,0 +1,45 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <tuple>
#include <string>
#include <boost/asio.hpp>
#include <aedis.hpp>
#include <aedis/experimental/sync.hpp>
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using aedis::experimental::exec;
using connection = aedis::connection<>;
int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
std::thread thread{[&]() {
conn.async_run(net::detached);
ioc.run();
}};
request req;
req.push("PING");
req.push("QUIT");
std::tuple<std::string, aedis::ignore> resp;
exec(conn, req, adapt(resp));
thread.join();
std::cout << "Response: " << std::get<0>(resp) << std::endl;
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}

View File

@@ -54,3 +54,11 @@ void print(std::string const& e)
std::cout << e << std::endl;
}
void print_push(std::vector<aedis::resp3::node<std::string>>& resp)
{
std::cout
<< "Push type: " << resp.at(1).value << "\n"
<< "Channel: " << resp.at(2).value << "\n"
<< "Message: " << resp.at(3).value << "\n"
<< std::endl;
}

View File

@@ -12,10 +12,12 @@
#include <string>
#include <boost/json.hpp>
#include <boost/json/src.hpp>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include <aedis.hpp>
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::resp3::request;
using connection = aedis::connection<>;
@@ -59,7 +61,7 @@ void to_bulk(std::string& to, user const& u)
}
// Deserializes
void from_bulk(user& u, boost::string_view sv, boost::system::error_code& ec)
void from_bulk(user& u, boost::string_view sv, boost::system::error_code&)
{
value jv = parse(sv);
u = value_to<user>(jv);
@@ -98,8 +100,9 @@ int main()
std::tuple<aedis::ignore, int, std::set<user>, std::string> resp;
db.async_exec("127.0.0.1", "6379", req, aedis::adapt(resp),
[](auto ec, auto) { std::cout << ec.message() << std::endl; });
db.async_run(req, aedis::adapt(resp), [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
ioc.run();

View File

@@ -7,57 +7,80 @@
#include <string>
#include <vector>
#include <iostream>
#include <tuple>
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
#include <aedis.hpp>
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using aedis::resp3::node;
using node_type = aedis::resp3::node<std::string>;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using connection = aedis::connection<tcp_socket>;
/* In this example we send a subscription to a channel and start
* reading server side messages indefinitely.
/* This example will subscribe and read pushes indefinitely.
*
* After starting the example you can test it by sending messages with
* redis-cli like this
* To test send messages with redis-cli
*
* $ redis-cli -3
* 127.0.0.1:6379> PUBLISH channel1 some-message
* (integer) 3
* 127.0.0.1:6379>
*
* The messages will then appear on the terminal you are running the
* example.
* To test reconnection try, for example, to close all clients currently
* connected to the Redis instance
*
* $ redis-cli
* > CLIENT kill TYPE pubsub
*/
net::awaitable<void> reader(std::shared_ptr<connection> db)
{
for (std::vector<node<std::string>> resp;;) {
auto n = co_await db->async_read_push(adapt(resp));
std::cout
<< "Size: " << n << "\n"
<< "Event: " << resp.at(1).value << "\n"
<< "Channel: " << resp.at(2).value << "\n"
<< "Message: " << resp.at(3).value << "\n"
<< std::endl;
resp.clear();
net::awaitable<void> receiver(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "channel");
for (std::vector<node_type> resp;;) {
auto const ev = co_await db->async_receive_event(aedis::adapt(resp));
std::cout << "Event: " << aedis::to_string<tcp_socket>(ev) << std::endl;
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;
case connection::event::hello:
// Subscribes to the channels when a new connection is
// stablished.
co_await db->async_exec(req);
break;
default:;
}
}
}
auto handler = [](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
db->async_exec("127.0.0.1", "6379", req, adapt(), handler);
net::co_spawn(ioc, reader(db), net::detached);
ioc.run();
try {
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->get_config().enable_events = true;
db->get_config().enable_reconnect = true;
net::co_spawn(ioc, receiver(db), net::detached);
db->async_run(net::detached);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });
ioc.run();
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
}
}

View File

@@ -0,0 +1,71 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <tuple>
#include <string>
#include <boost/asio.hpp>
#include <aedis.hpp>
#include <aedis/experimental/sync.hpp>
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::resp3::request;
using aedis::experimental::exec;
using aedis::experimental::receive_event;
using connection = aedis::connection<>;
using aedis::resp3::node;
using event = connection::event;
// See subscriber.cpp for more info about how to run this example.
void subscriber(connection& conn)
{
request req;
req.push("SUBSCRIBE", "channel");
for (std::vector<node<std::string>> resp;;) {
auto const ev = receive_event(conn, aedis::adapt(resp));
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;
case connection::event::hello:
// Subscribes to the channels when a new connection is
// stablished.
exec(conn, req);
break;
default:;
}
}
}
int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
conn.get_config().enable_events = true;
conn.get_config().enable_reconnect = true;
std::thread thread{[&]() {
conn.async_run(net::detached);
ioc.run();
}};
subscriber(conn);
thread.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}

26
include/Makefile.am Normal file
View File

@@ -0,0 +1,26 @@
nobase_include_HEADERS =\
$(top_srcdir)/include/aedis/src.hpp\
$(top_srcdir)/include/aedis/error.hpp\
$(top_srcdir)/include/aedis/impl/error.ipp\
$(top_srcdir)/include/aedis/detail/net.hpp\
$(top_srcdir)/include/aedis/connection.hpp\
$(top_srcdir)/include/aedis/adapt.hpp\
$(top_srcdir)/include/aedis/detail/connection_ops.hpp\
$(top_srcdir)/include/aedis.hpp\
$(top_srcdir)/include/aedis/experimental/sync.hpp\
$(top_srcdir)/include/aedis/adapter/detail/adapters.hpp\
$(top_srcdir)/include/aedis/adapter/adapt.hpp\
$(top_srcdir)/include/aedis/adapter/detail/response_traits.hpp\
$(top_srcdir)/include/aedis/resp3/node.hpp\
$(top_srcdir)/include/aedis/resp3/compose.hpp\
$(top_srcdir)/include/aedis/resp3/detail/read_ops.hpp\
$(top_srcdir)/include/aedis/resp3/detail/parser.hpp\
$(top_srcdir)/include/aedis/resp3/detail/exec.hpp\
$(top_srcdir)/include/aedis/resp3/type.hpp\
$(top_srcdir)/include/aedis/resp3/read.hpp\
$(top_srcdir)/include/aedis/resp3/write.hpp\
$(top_srcdir)/include/aedis/resp3/request.hpp\
$(top_srcdir)/include/aedis/resp3/impl/request.ipp\
$(top_srcdir)/include/aedis/resp3/detail/impl/parser.ipp\
$(top_srcdir)/include/aedis/resp3/impl/type.ipp

View File

@@ -8,64 +8,221 @@
#define AEDIS_HPP
#include <aedis/error.hpp>
#include <aedis/command.hpp>
#include <aedis/adapt.hpp>
#include <aedis/connection.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/write.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/adapter/adapt.hpp>
// \li Support for Redis [sentinel](https://redis.io/docs/manual/sentinel).
// TODO: Reconnect support.
// TODO: Remove conflicts of the adapt function.
/** \mainpage Documentation
\tableofcontents
Useful links: \subpage any, [Changelog](CHANGELOG.md) and [Benchmarks](benchmarks/benchmarks.md).
\section Overview
Aedis is a high-level [Redis](https://redis.io/) client library
built on top of [Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html)
that provides simple and efficient communication with a Redis
server. Some of its distinctive features are
built on top of
[Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html),
some of its distinctive features are
\li Support for the latest version of the Redis communication protocol [RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md).
\li First class support for STL containers and C++ built-in types.
\li Serialization and deserialization of your own data types.
\li Zero asymptotic allocations by means of memory reuse.
\li Healthy checks, back pressure and low latency.
\li Hides most of the low level asynchronous operations away from the user.
Aedis API hides most of the low level asynchronous operations
away from the user, for example, the code below pings a message to
the server
Let us start with an overview of asynchronous code.
\code
@subsection Async
The code below sends a ping command to Redis (see intro.cpp)
@code
int main()
{
net::io_context ioc;
connection db{ioc};
request req;
req.push("HELLO", 3);
req.push("PING");
req.push("QUIT");
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
net::io_context ioc;
connection db{ioc};
db.async_exec("127.0.0.1", "6379", req, adapt(resp),
[](auto ec, auto) { std::cout << ec.message() << std::endl; });
std::tuple<std::string, aedis::ignore> resp;
db.async_run(req, adapt(resp), net::detached);
ioc.run();
// Print the ping message.
std::cout << std::get<1>(resp) << std::endl;
std::cout << std::get<0>(resp) << std::endl;
}
\endcode
@endcode
The connection class maintains a healthy connection with
Redis over which users can execute their commands, without any
need of queuing. For example, to execute more than one command
@code
int main()
{
...
net::io_context ioc;
connection db{ioc};
db.async_exec(req1, adapt(resp1), handler1);
db.async_exec(req2, adapt(resp2), handler2);
db.async_exec(req3, adapt(resp3), handler3);
db.async_run(net::detached);
ioc.run();
...
}
@endcode
The `async_exec` functions above can be called from different
places in the code without knowing about each other, see for
example echo_server.cpp. Server-side pushes are supported on the
same connection where commands are executed, a typical subscriber
will look like
(see subscriber.cpp)
@code
net::awaitable<void> reader(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "channel");
for (std::vector<node_type> resp;;) {
auto ev = co_await db->async_receive_event(aedis::adapt(resp));
switch (ev) {
case connection::event::push:
// Use resp.
resp.clear();
break;
case connection::event::hello:
// Subscribes to channels when a new connection is
// stablished.
co_await db->async_exec(req);
break;
default:;
}
}
}
@endcode
@subsection Sync
The `connection` class is async-only, many users however need to
interact with it synchronously, this is also supported by Aedis as long
as this interaction occurs across threads, for example (see
intro_sync.cpp)
@code
int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
std::thread thread{[&]() {
conn.async_run(net::detached);
ioc.run();
}};
request req;
req.push("PING");
req.push("QUIT");
std::tuple<std::string, aedis::ignore> resp;
exec(conn, req, adapt(resp));
thread.join();
std::cout << "Response: " << std::get<0>(resp) << std::endl;
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}
@endcode
\subsection using-aedis Installation
To install and use Aedis you will need
- Boost 1.78 or greater.
- C++17. Some examples require C++20 with coroutine support.
- Redis 6 or higher. Optionally also redis-cli and Redis Sentinel.
For a simple installation run
```
# Clone the repository and checkout the lastest release tag.
$ git clone --branch v0.3.0 https://github.com/mzimbres/aedis.git
$ cd aedis
# Build an example
$ g++ -std=c++17 -pthread examples/intro.cpp -I./include -I/path/boost_1_79_0/include/
```
For a proper full installation on the system run
```
# Download and unpack the latest release
$ wget https://github.com/mzimbres/aedis/releases/download/v0.3.0/aedis-0.2.1.tar.gz
$ tar -xzvf aedis-0.2.1.tar.gz
# Configure, build and install
$ ./configure --prefix=/opt/aedis-0.2.1 --with-boost=/opt/boost_1_78_0
$ sudo make install
```
To build examples and tests
```
$ make
```
@subsubsection using_aedis Using Aedis
When writing you own applications include the following header
```cpp
#include <aedis/src.hpp>
```
in no more than one source file in your applications.
@subsubsection sup-comp Supported compilers
Aedis has been tested with the following compilers
- Tested with gcc: 12, 11.
- Tested with clang: 14, 13, 11.
\subsubsection Developers
To generate the build system clone the repository and run
```
# git clone https://github.com/mzimbres/aedis.git
$ autoreconf -i
```
After that you will have a configure script that you can run as
explained above, for example, to use a compiler other that the
system compiler run
```
$ CXX=clang++-14 CXXFLAGS="-g" ./configure --with-boost=...
```
To generate release tarballs run
```
$ make distcheck
```
For a detailed comparison of Redis clients and the design
rationale behind Aedis jump to \ref why-aedis. For benchmarks see [](https://github.com/mzimbres/aedis/blob/master/benchmarks/benchmarks.md)
\section requests Requests
@@ -87,7 +244,7 @@
// Same as above but as an iterator range.
req.push_range2("SUBSCRIBE", std::cbegin(list), std::cend(list));
// Sends a map.
// Pushes a map.
std::map<std::string, mystruct> map
{ {"key1", "value1"}
, {"key2", "value2"}
@@ -101,8 +258,6 @@
co_await db->async_exec(req, adapt(resp));
@endcode
The second argument \c adapt(resp) will be explained in \ref requests.
\subsection requests-serialization Serialization
The \c push and \c push_range functions above work with integers
@@ -116,7 +271,6 @@
void to_bulk(std::string& to, mystruct const& obj)
{
// Convert to obj string and call to_bulk.
std::string dummy = "Dummy serializaiton string.";
aedis::resp3::to_bulk(to, dummy);
}
@@ -133,7 +287,7 @@
req.push_range("HSET", "key", map);
@endcode
It is quite common to store json string in Redis for example.
Example serialization.cpp shows how store json string in Redis.
\section low-level-responses Responses
@@ -154,7 +308,7 @@
proper C++ data structure to receive it in. Fortunately, this is a
simple task for most types. The table below summarises the options
RESP3 type | C++ | Type
RESP3 type | Possible C++ type | Type
---------------|--------------------------------------------------------------|------------------
Simple-string | \c std::string | Simple
Simple-error | \c std::string | Simple
@@ -168,45 +322,42 @@
Set | \c std::vector, \c std::set, \c std::unordered_set | Aggregate
Push | \c std::vector, \c std::map, \c std::unordered_map | Aggregate
For example
@code
request req;
req.push("HELLO", 3);
req.push_range("RPUSH", "key1", vec);
req.push_range("HSET", "key2", map);
req.push("LRANGE", "key3", 0, -1);
req.push("HGETALL", "key4");
req.push("QUIT");
std::tuple<
aedis::ignore, // hello
int, // rpush
int, // hset
std::vector<T>, // lrange
std::map<U, V>, // hgetall
std::string // quit
> resp;
co_await db->async_exec(req, adapt(resp));
@endcode
The tag @c aedis::ignore can be used to ignore individual
elements in the responses. If the intention is to ignore the
response to all commands in the request use @c adapt()
@code
co_await db->async_exec(req, adapt());
@endcode
Responses that contain nested aggregates or heterogeneous data
types will be given special treatment later in \ref gen-case. As
of this writing, not all RESP3 types are used by the Redis server,
which means in practice users will be concerned with a reduced
subset of the RESP3 specification. Now let us see some examples
@code
// To ignore the response.
co_await db->async_exec(req, adapt());
// Read in a std::string e.g. get.
std::string str;
co_await db->async_exec(req, adapt(resp));
// Read in a long long e.g. rpush.
long long resp;
co_await db->async_exec(req, adapt(resp));
// Read in a std::set e.g. smembers.
std::set<T, U> resp;
co_await db->async_exec(req, adapt(resp));
// Read in a std::map e.g. hgetall.
std::map<T, U> resp;
co_await db->async_exec(req, adapt(resp));
// Read in a std::unordered_map e.g. hgetall.
std::unordered_map<T, U> resp;
co_await db->async_exec(req, adapt(resp));
// Read in a std::vector e.g. lrange.
std::vector<T> resp;
co_await db->async_exec(req, adapt(resp));
@endcode
In other words, it is straightforward, just pass the result of \c
adapt to the read function and make sure the response data type is
compatible with the data structure you are calling @c adapter(...)
with. All standard C++ containers are supported by Aedis.
subset of the RESP3 specification.
\subsection Optional
@@ -220,32 +371,14 @@
co_await db->async_exec(req, adapt(resp));
@endcode
Everything else stays the same, before accessing data, users will
have to check or assert the optional contains a value.
Everything else stays the same.
\subsection heterogeneous_aggregates Heterogeneous aggregates
\subsection transactions Transactions
There are cases where Redis returns aggregates that
contain heterogeneous data, for example, an array that contains
integers, strings nested sets etc. Aedis supports reading such
aggregates in a \c std::tuple efficiently as long as the they
don't contain 3-order nested aggregates e.g. an array that
contains an array of arrays. For example, to read the response to
a \c hello command we can use the following response type.
@code
using hello_type = std::tuple<
std::string, std::string,
std::string, std::string,
std::string, int,
std::string, int,
std::string, std::string,
std::string, std::string,
std::string, std::vector<std::string>>;
@endcode
Transactions are another example where this feature is useful, for
example, the response to the transaction below
To read the response to transactions we have to observe that Redis
queues the commands as they arrive and sends the responses back to
the user in a single array, in the response to the @c exec command.
For example, to read the response to the this request
@code
db.send("MULTI");
@@ -255,57 +388,42 @@
db.send("EXEC");
@endcode
can be read in the following way
use the following response type
@code
using trans_type =
using aedis::ignore;
using boost::optional;
using tresp_type =
std::tuple<
boost::optional<std::string>, // get
boost::optional<std::vector<std::string>>, // lrange
boost::optional<std::map<std::string, std::string>> // hgetall
optional<std::string>, // get
optional<std::vector<std::string>>, // lrange
optional<std::map<std::string, std::string>> // hgetall
>;
std::tuple<
aedis::ignore, // multi
aedis::ignore, // get
aedis::ignore, // lrange
aedis::ignore, // hgetall
trans_type, // exec
ignore, // multi
ignore, // get
ignore, // lrange
ignore, // hgetall
tresp_type, // exec
> resp;
co_await db->async_exec(req, adapt(resp));
@endcode
Note that above we are not ignoring the response to the commands
themselves but whether they have been successfully queued. Only
after @c exec is received Redis will execute them in sequence and
send all responses together in an array.
themselves but whether they have been successfully queued. For a
complete example see containers.cpp.
\subsection Serialization
\subsection Deserialization
As mentioned in \ref requests-serialization, it is common for
users to serialized data before sending it to Redis e.g. json
strings, for example
@code
sr.push("SET", "key", "{"Server": "Redis"}"); // Unquoted for readability.
sr.push("GET", "key")
@endcode
For performance and convenience reasons, we may want to avoid
receiving the response to the \c get command above as a string
just to convert it later to a e.g. deserialized json. To support
this, Aedis calls a user defined \c from_bulk function while
parsing the response. In simple terms, define your type
@code
struct mystruct {
// struct fields.
};
@endcode
and deserialize it from a string in a function \c from_bulk with
the following signature
As mentioned in \ref requests-serialization, it is common to
serialize data before sending it to Redis e.g. to json strings.
For performance and convenience reasons, we may also want to
deserialize it directly in its final data structure. Aedis
supports this use case by calling a user provided \c from_bulk
function while parsing the response. For example
@code
void from_bulk(mystruct& obj, char const* p, std::size_t size, boost::system::error_code& ec)
@@ -353,7 +471,7 @@
std::vector<node<std::string>>. The vector can be seen as a
pre-order view of the response tree
(https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR).
Using it is no different that using other types
Using it is no different than using other types
@code
// Receives any RESP3 simple data type.
@@ -380,98 +498,21 @@
The examples listed below cover most use cases presented in the documentation above.
@li intro.cpp: Basic steps with Aedis.
@li intro_sync.cpp: Synchronous version of intro.cpp.
@li containers.cpp: Shows how to send and receive stl containers.
@li serialization.cpp: Shows the \c request support to serialization of user types.
@li subscriber.cpp: Shows how channel subscription works.
@li echo_server.cpp: A simple TCP echo server that users coroutines.
@li subscriber.cpp: Shows how to subscribe to a channel and how to reconnect when connection is lost.
@li subscriber_sync.cpp: Synchronous version of subscriber.cpp.
@li echo_server.cpp: A simple TCP echo server that uses coroutines.
@li chat_room.cpp: A simple chat room that uses coroutines.
\section using-aedis Using Aedis
To install and use Aedis you will need
- Boost 1.78 or greater.
- Unix Shell and Make (for linux users).
- C++14. Some examples require C++20 with coroutine support.
- Redis server.
Some examples will also require interaction with
- redis-cli: Used in one example.
- Redis Sentinel Server: used in some examples.
Aedis has been tested with the following compilers
- Tested with gcc: 7.5.0, 8.4.0, 9.3.0, 10.3.0.
- Tested with clang: 11.0.0, 10.0.0, 9.0.1, 8.0.1, 7.0.1.
\section Installation
The first thing to do is to download and unpack Aedis
```
# Download the latest release on github
$ wget https://github.com/mzimbres/aedis/releases
# Uncompress the tarball and cd into the dir
$ tar -xzvf aedis-version.tar.gz
```
If you can't use \c configure and \c make (e.g. Windows users)
add the directory where you unpacked Aedis to the
include directories in your project, otherwise run
```
# See configure --help for all options.
$ ./configure --prefix=/opt/aedis-version --with-boost=/opt/boost_1_78_0
# Install Aedis in the path specified in --prefix
$ sudo make install
```
and include the following header
```cpp
#include <aedis/src.hpp>
```
in exactly one source file in your applications. At this point you
can start using Aedis. To build the examples and run the tests run
```
# Build aedis examples.
$ make examples
# Test aedis in your machine.
$ make check
```
\section Developers
To generate the build system run
```
$ autoreconf -i
```
After that you will have a configure script
that you can run as explained above, for example, to use a
compiler other that the system compiler run
```
$ CC=/opt/gcc-10.2.0/bin/gcc-10.2.0 CXX=/opt/gcc-10.2.0/bin/g++-10.2.0 CXXFLAGS="-g -Wall -Werror" ./configure ...
$ make distcheck
```
\section why-aedis Why Aedis
At the time of this writing there are seventeen Redis clients
listed in the [official](https://redis.io/docs/clients/#cpp) list.
With so many clients available it is not unlikely that users are
asking themselves why yet another one. In this section I will try
to compare Aedis to the most popular clients and why we need
to compare Aedis with the most popular clients and why we need
Aedis. Notice however that this is ongoing work as comparing
client objectively is difficult and time consuming.
@@ -484,8 +525,8 @@
not support
@li RESP3. Without RESP3 is impossible to support some important Redis features like client side caching, among other things.
@li The Asio asynchronous model.
@li Reading response diretly in user data structures avoiding temporaries.
@li Coroutines.
@li Reading responses directly in user data structures avoiding temporaries.
@li Error handling with error-code and exception overloads.
@li Healthy checks.
@@ -531,11 +572,11 @@
Some of the problems with this API are
@li Heterogeneous treatment of commands, pipelines and transaction.
@li Heterogeneous treatment of commands, pipelines and transaction. This makes auto-pipelining impossible.
@li Any Api that sends individual commands has a very restricted scope of usability and should be avoided for performance reasons.
@li The API imposes exceptions on users, no error-code overload is provided.
@li No way to reuse the buffer for new calls to e.g. \c redis.get in order to avoid further dynamic memory allocations.
@li Error handling of resolve and connection no clear.
@li Error handling of resolve and connection not clear.
According to the documentation, pipelines in redis-plus-plus have
the following characteristics
@@ -546,13 +587,13 @@
This is clearly a downside of the API as pipelines should be the
default way of communicating and not an exception, paying such a
high price for each pipeline imposes a severe cost in performance.
Transactions also suffer from the very same problem
Transactions also suffer from the very same problem.
> NOTE: Creating a Transaction object is NOT cheap, since it
> creates a new connection.
In Aedis there is no difference between sending one command, a
pipeline or a transaction because creating the request is decoupled
pipeline or a transaction because requests are decoupled
from the IO objects.
> redis-plus-plus also supports async interface, however, async
@@ -587,11 +628,6 @@
@li Richard Hodges ([madmongo1](https://github.com/madmongo1)): For helping me with Asio and the design of asynchronous programs in general.
@li Vinícius dos Santos Oliveira ([vinipsmaker](https://github.com/vinipsmaker)): For useful discussion about how Aedis consumes buffers in the read operation (among other things).
@li Petr Dannhofer ([Eddie-cz](https://github.com/Eddie-cz)): For helping me understand how the `AUTH` and `HELLO` command can influence each other.
\section Reference
See \subpage any.
*/
/** \defgroup any Reference
@@ -599,4 +635,14 @@
* This page contains the documentation of all user facing code.
*/
// Support sentinel support as described in
//
// - https://redis.io/docs/manual/sentinel.
// - https://redis.io/docs/reference/sentinel-clients.
//
// Avoid conflicts between
//
// - aedis::adapt
// - aedis::resp3::adapt.
#endif // AEDIS_HPP

View File

@@ -20,6 +20,17 @@
namespace aedis {
/** @brief A type that ignores responses.
*
* For example
*
* @code
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
* @endcode
*
* will cause only the second tuple type to be parsed, the others
* will be ignored.
*/
using ignore = adapter::detail::ignore;
namespace detail {
@@ -27,11 +38,13 @@ namespace detail {
struct ignore_adapter {
void
operator()(
std::size_t i,
resp3::node<boost::string_view> const& nd,
boost::system::error_code& ec)
std::size_t,
resp3::node<boost::string_view> const&,
boost::system::error_code&)
{
}
auto supported_response_size() const noexcept { return std::size_t(-1);}
};
template <class Tuple>
@@ -47,9 +60,11 @@ private:
public:
static_adapter(Tuple& r = nullptr)
{
adapter::detail::assigner<std::tuple_size<Tuple>::value - 1>::assign(adapters_, r);
adapter::detail::assigner<size - 1>::assign(adapters_, r);
}
auto supported_response_size() const noexcept { return size;}
void
operator()(
std::size_t i,
@@ -71,9 +86,11 @@ private:
public:
vector_adapter(Vector& v) : adapter_{adapter::adapt(v)} { }
auto supported_response_size() const noexcept { return std::size_t(-1);}
void
operator()(
std::size_t i,
std::size_t,
resp3::node<boost::string_view> const& nd,
boost::system::error_code& ec)
{
@@ -113,11 +130,21 @@ struct response_traits<std::tuple<Ts...>> {
} // detail
/** @brief Creates an adapter that ignores responses.
*
* This function can be used to create adapters that ignores
* responses. As a result it can improve performance.
*/
auto adapt() noexcept
{
return detail::response_traits<void>::adapt();
}
/** @brief Adapts a type to be used as a response.
*
* The type T can be any STL container, any integer type and
* \c std::string
*/
template<class T>
auto adapt(T& t) noexcept
{

View File

@@ -63,7 +63,7 @@ from_bulk(
void from_bulk(
bool& t,
boost::string_view sv,
boost::system::error_code& ec)
boost::system::error_code&)
{
t = *sv.data() == 't';
}

View File

@@ -90,17 +90,15 @@ struct assigner<0> {
}
};
// TODO: I am not sure we need the mp_unique below.
template <class Tuple>
class static_aggregate_adapter {
private:
using adapters_array_type =
std::array<
boost::mp11::mp_unique<
boost::mp11::mp_rename<
boost::mp11::mp_transform<
adapter_t, Tuple>,
boost::variant2::variant>>,
boost::mp11::mp_rename<
boost::mp11::mp_transform<
adapter_t, Tuple>,
boost::variant2::variant>,
std::tuple_size<Tuple>::value>;
std::size_t i_ = 0;

View File

@@ -14,6 +14,7 @@
#include <memory>
#include <type_traits>
#include <boost/assert.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/experimental/channel.hpp>
@@ -24,10 +25,8 @@
namespace aedis {
// https://redis.io/docs/reference/sentinel-clients
/** \brief A high level Redis connection class.
* \ingroup any
/** @brief A high level Redis connection class.
* @ingroup any
*
* This class keeps a healthy connection to the Redis instance where
* commands can be sent at any time. For more details, please see the
@@ -44,10 +43,27 @@ public:
using next_layer_type = AsyncReadWriteStream;
using default_completion_token_type = boost::asio::default_completion_token_t<executor_type>;
using channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
using clock_type = std::chrono::steady_clock;
using clock_traits_type = boost::asio::wait_traits<clock_type>;
using timer_type = boost::asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using resolver_type = boost::asio::ip::basic_resolver<boost::asio::ip::tcp, executor_type>;
/** @brief Configuration parameters.
/** @brief Connection configuration parameters.
*/
struct config {
/// The Redis server address.
std::string host = "127.0.0.1";
/// The Redis server port.
std::string port = "6379";
/// Username if authentication is required.
std::string username;
/// Password if authentication is required.
std::string password;
/// Timeout of the resolve operation.
std::chrono::milliseconds resolve_timeout = std::chrono::seconds{10};
@@ -57,19 +73,42 @@ public:
/// Time interval ping operations.
std::chrono::milliseconds ping_interval = std::chrono::seconds{1};
/// The maximum size allowed of read operations.
/// Time waited before trying a reconnection (see enable reconnect).
std::chrono::milliseconds reconnect_interval = std::chrono::seconds{1};
/// The maximum size allowed on read operations.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
/// Whether to coalesce requests or not.
/// Whether to coalesce requests (see [pipelines](https://redis.io/topics/pipelining)).
bool coalesce_requests = true;
/// Enable events
bool enable_events = false;
/// Enable automatic reconnection (see also reconnect_interval).
bool enable_reconnect = false;
};
/** \brief Constructor.
/// Events communicated through \c async_receive_event.
enum class event {
/// The address has been successfully resolved.
resolve,
/// Connected to the Redis server.
connect,
/// Success sending AUTH and HELLO.
hello,
/// A push event has been received.
push,
/// Used internally.
invalid
};
/** \brief Construct a connection from an executor.
*
* \param ex The executor.
* \param cfg Configuration parameters.
*/
connection(boost::asio::any_io_executor ex, config cfg = config{})
connection(executor_type ex, config cfg = config{})
: resv_{ex}
, ping_timer_{ex}
, check_idle_timer_{ex}
@@ -83,6 +122,11 @@ public:
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
/** \brief Construct a connection from an io_context.
*
* \param ioc The executor.
* \param cfg Configuration parameters.
*/
connection(boost::asio::io_context& ioc, config cfg = config{})
: connection(ioc.get_executor(), cfg)
{ }
@@ -110,12 +154,10 @@ public:
* connection::config::ping_interval.
*
* \li Starts reading from the socket and delivering events to the
* request started with \c async_exec or \c async_read_push.
* request started with \c async_exec and \c async_receive_event.
*
* For an example see echo_server.cpp.
*
* \param host Redis address.
* \param port Redis port.
* \param token Completion token.
*
* The completion token must have the following signature
@@ -127,33 +169,29 @@ public:
* \return This function returns only when there is an error.
*/
template <class CompletionToken = default_completion_token_type>
auto
async_run(
boost::string_view host = "127.0.0.1",
boost::string_view port = "6379",
CompletionToken token = CompletionToken{})
auto async_run(CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::run_op<connection>{this, host, port}, token, resv_);
>(detail::run_op<connection>{this}, token, resv_);
}
/** @brief Executes a command on the redis server.
/** @brief Executes a command on the redis server asynchronously.
*
* \param req Request object.
* \param adapter Response adapter.
* \param token Asio completion token.
*
* For an example see containers.cpp. The completion token must
* For an example see echo_server.cpp. The completion token must
* have the following signature
*
* @code
* void f(boost::system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the response that has
* just been read.
* Where the second parameter is the size of the response in
* bytes.
*/
template <
class Adapter = detail::response_traits<void>::adapter_type,
@@ -163,20 +201,20 @@ public:
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
{
BOOST_ASSERT_MSG(req.size() <= adapter.supported_response_size(), "Request and adapter have incompatible sizes.");
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<connection, Adapter>{this, &req, adapter}, token, resv_);
}
/** @brief Connects and executes a single command.
/** @brief Connects and executes a request asynchronously.
*
* Combines \c async_run and the other \c async_exec overload in a
* single function. This function is useful for users that want to
* send a single request to the server and close it.
*
* \param host Address of the Redis server.
* \param port Port of the Redis server.
* \param req Request object.
* \param adapter Response adapter.
* \param token Asio completion token.
@@ -187,15 +225,12 @@ public:
* void f(boost::system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the response that has
* just been read.
* Where the second parameter is the size of the response in bytes.
*/
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto async_exec(
boost::string_view host,
boost::string_view port,
auto async_run(
resp3::request const& req,
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
@@ -204,10 +239,10 @@ public:
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::runexec_op<connection, Adapter>
{this, host, port, &req, adapter}, token, resv_);
{this, &req, adapter}, token, resv_);
}
/** @brief Receives Redis unsolicited events like pushes.
/** @brief Receives unsolicited events asynchronously.
*
* Users that expect unsolicited events should call this function
* in a loop. If an unsolicited events comes in and there is no
@@ -220,7 +255,7 @@ public:
* have the following signature
*
* @code
* void f(boost::system::error_code, std::size_t);
* void f(boost::system::error_code, event);
* @endcode
*
* Where the second parameter is the size of the response that has
@@ -229,7 +264,7 @@ public:
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto async_read_push(
auto async_receive_event(
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
{
@@ -242,15 +277,15 @@ public:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::read_push_op<connection, decltype(f)>{this, f}, token, resv_);
, void(boost::system::error_code, event)
>(detail::receive_op<connection, decltype(f)>{this, f}, token, resv_);
}
/** @brief Cancel all pending request.
*
* \returns The number of requests that have been canceled.
*/
std::size_t cancel_requests()
std::size_t cancel_execs()
{
for (auto& e: reqs_) {
e->stop = true;
@@ -267,26 +302,31 @@ public:
* Calling this function will cause \c async_run to return. It is
* safe to try a reconnect after that, when that happens, all
* pending request will be automatically sent.
*
* Calling this function will causes @c async_receive_event to return
* with @c boost::asio::experimental::channel_errc::channel_cancelled.
*
* Note however that the prefered way to close a connection is to
* send a \c quit command if you are actively closing it.
* Otherwise an unresponsive Redis server will cause the
* idle-checks to kick in and lead to \c async_run
* returning with idle_timeout.
* \remarks
*
* The prefered way to close a connection is to send a \c quit
* command if you are actively closing it. Otherwise an
* unresponsive Redis server will cause the idle-checks to kick in
* and lead to \c async_run returning with idle_timeout.
*
*/
void cancel_run()
{
socket_->close();
if (socket_)
socket_->close();
read_timer_.cancel();
check_idle_timer_.cancel();
writer_timer_.cancel();
ping_timer_.cancel();
push_channel_.cancel();
// Cancel own pings if there is any waiting.
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !ptr->req->is_internal();
return !ptr->req->close_on_run_completion;
});
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
@@ -297,10 +337,22 @@ public:
reqs_.erase(point, std::end(reqs_));
}
/// Cancels the event receiver.
void cancel_event_receiver()
{
push_channel_.cancel();
}
/// Get the config object.
config& get_config() noexcept { return cfg_;}
/// Gets the config object.
config const& get_config() const noexcept { return cfg_;}
private:
struct req_info {
req_info(boost::asio::any_io_executor ex) : timer{ex} {}
boost::asio::steady_timer timer;
req_info(executor_type ex) : timer{ex} {}
timer_type timer;
resp3::request const* req = nullptr;
std::size_t cmds = 0;
bool stop = false;
@@ -310,11 +362,12 @@ private:
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
using reqs_type = std::deque<std::shared_ptr<req_info>>;
template <class T, class U> friend struct detail::read_push_op;
template <class T, class U> friend struct detail::receive_op;
template <class T> friend struct detail::reader_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::ping_op;
template <class T> friend struct detail::run_op;
template <class T> friend struct detail::run_one_op;
template <class T, class U> friend struct detail::exec_op;
template <class T, class U> friend struct detail::exec_read_op;
template <class T, class U> friend struct detail::runexec_op;
@@ -322,11 +375,21 @@ private:
template <class T> friend struct detail::resolve_with_timeout_op;
template <class T> friend struct detail::check_idle_op;
template <class T> friend struct detail::start_op;
template <class T> friend struct detail::send_receive_op;
template <class CompletionToken = default_completion_token_type>
auto async_run_one(CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::run_one_op<connection>{this}, token, resv_);
}
void cancel_push_requests()
{
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !(ptr->written && ptr->req->commands() == 0);
return !(ptr->written && ptr->req->size() == 0);
});
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
@@ -347,16 +410,12 @@ private:
{ return boost::asio::dynamic_buffer(read_buffer_, cfg_.max_read_size); }
template <class CompletionToken>
auto
async_resolve_with_timeout(
boost::string_view host,
boost::string_view port,
CompletionToken&& token)
auto async_resolve_with_timeout(CompletionToken&& token)
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::resolve_with_timeout_op<connection>{this, host, port},
>(detail::resolve_with_timeout_op<connection>{this},
token, resv_);
}
@@ -369,7 +428,6 @@ private:
>(detail::connect_with_timeout_op<connection>{this}, token, resv_);
}
// Loops on async_read_with_timeout described above.
template <class CompletionToken>
auto reader(CompletionToken&& token)
{
@@ -435,20 +493,18 @@ private:
auto const size = cfg_.coalesce_requests ? reqs_.size() : 1;
for (auto i = 0UL; i < size; ++i) {
write_buffer_ += reqs_.at(i)->req->payload();
cmds_ += reqs_.at(i)->req->commands();
cmds_ += reqs_.at(i)->req->size();
reqs_.at(i)->written = true;
}
}
using channel_type = boost::asio::experimental::channel<void(boost::system::error_code, std::size_t)>;
// IO objects
boost::asio::ip::tcp::resolver resv_;
resolver_type resv_;
std::shared_ptr<AsyncReadWriteStream> socket_;
boost::asio::steady_timer ping_timer_;
boost::asio::steady_timer check_idle_timer_;
boost::asio::steady_timer writer_timer_;
boost::asio::steady_timer read_timer_;
timer_type ping_timer_;
timer_type check_idle_timer_;
timer_type writer_timer_;
timer_type read_timer_;
channel_type push_channel_;
config cfg_;
@@ -456,6 +512,7 @@ private:
std::string write_buffer_;
std::size_t cmds_ = 0;
reqs_type reqs_;
event last_event_ = event::invalid;
// Last time we received data.
time_point_type last_data_;
@@ -466,6 +523,29 @@ private:
resp3::request req_;
};
/// Converts a connection event to a string.
template <class T>
char const* to_string(typename connection<T>::event e)
{
using event_type = typename connection<T>::event;
switch (e) {
case event_type::resolve: return "resolve";
case event_type::connect: return "connect";
case event_type::hello: return "hello";
case event_type::push: return "push";
case event_type::invalid: return "invalid";
default: BOOST_ASSERT_MSG(false, "to_string: unhandled event.");
}
}
/// Writes a connection event to the stream.
template <class T>
std::ostream& operator<<(std::ostream& os, typename connection<T>::event e)
{
os << to_string(e);
return os;
}
} // aedis
#endif // AEDIS_CONNECTION_HPP

View File

@@ -20,6 +20,7 @@
#include <aedis/error.hpp>
#include <aedis/detail/net.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/detail/exec.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/write.hpp>
@@ -36,12 +37,12 @@ namespace detail {
template <class Conn>
struct connect_with_timeout_op {
Conn* conn;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::endpoint const& ep = {})
, boost::asio::ip::tcp::endpoint const& = {})
{
reenter (coro)
{
@@ -56,9 +57,7 @@ struct connect_with_timeout_op {
template <class Conn>
struct resolve_with_timeout_op {
Conn* conn;
boost::string_view host;
boost::string_view port;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
@@ -68,7 +67,10 @@ struct resolve_with_timeout_op {
reenter (coro)
{
conn->ping_timer_.expires_after(conn->cfg_.resolve_timeout);
yield aedis::detail::async_resolve(conn->resv_, conn->ping_timer_, host, port, std::move(self));
yield
aedis::detail::async_resolve(
conn->resv_, conn->ping_timer_,
conn->cfg_.host, conn->cfg_.port, std::move(self));
conn->endpoints_ = res;
self.complete(ec);
}
@@ -76,11 +78,11 @@ struct resolve_with_timeout_op {
};
template <class Conn, class Adapter>
struct read_push_op {
Conn* conn;
struct receive_op {
Conn* conn = nullptr;
Adapter adapter;
std::size_t read_size;
boost::asio::coroutine coro;
std::size_t read_size = 0;
boost::asio::coroutine coro{};
template <class Self>
void
@@ -92,22 +94,24 @@ struct read_push_op {
{
yield conn->push_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec, 0);
self.complete(ec, Conn::event::invalid);
return;
}
BOOST_ASSERT(conn->socket_ != nullptr);
yield resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self));
if (ec) {
conn->cancel_run();
self.complete(ec, 0);
return;
}
if (conn->last_event_ == Conn::event::push) {
BOOST_ASSERT(conn->socket_ != nullptr);
yield resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self));
if (ec) {
conn->cancel_run();
self.complete(ec, Conn::event::invalid);
return;
}
read_size = n;
read_size = n;
}
yield conn->push_channel_.async_send({}, 0, std::move(self));
self.complete(ec, read_size);
self.complete(ec, conn->last_event_);
return;
}
}
@@ -120,7 +124,7 @@ struct exec_read_op {
std::size_t cmds = 0;
std::size_t read_size = 0;
std::size_t index = 0;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void
@@ -150,15 +154,18 @@ struct exec_read_op {
}
// If the next request is a push we have to handle it to
// the read_push_op wait for it to be done and continue.
// the receive_op wait for it to be done and continue.
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) {
conn->last_event_ = Conn::event::push;
yield async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
// Notice we don't call cancel_run() as that is the
// responsability of the read_push_op.
// responsability of the receive_op.
self.complete(ec, 0);
return;
}
continue;
}
//-----------------------------------
@@ -193,12 +200,12 @@ template <class Conn, class Adapter>
struct exec_op {
using req_info_type = typename Conn::req_info;
Conn* conn;
resp3::request const* req;
Adapter adapter;
std::shared_ptr<req_info_type> info;
Conn* conn = nullptr;
resp3::request const* req = nullptr;
Adapter adapter{};
std::shared_ptr<req_info_type> info = nullptr;
std::size_t read_size = 0;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void
@@ -211,7 +218,7 @@ struct exec_op {
info = std::allocate_shared<req_info_type>(boost::asio::get_associated_allocator(self), conn->resv_.get_executor());
info->timer.expires_at(std::chrono::steady_clock::time_point::max());
info->req = req;
info->cmds = req->commands();
info->cmds = req->size();
info->stop = false;
conn->add_request_info(info);
@@ -225,13 +232,14 @@ struct exec_op {
BOOST_ASSERT(conn->socket_->is_open());
if (req->commands() == 0) {
if (req->size() == 0) {
self.complete({}, 0);
return;
}
BOOST_ASSERT(!conn->reqs_.empty());
BOOST_ASSERT(conn->reqs_.front() != nullptr);
BOOST_ASSERT(conn->cmds_ != 0);
yield conn->async_exec_read(adapter, conn->reqs_.front()->cmds, std::move(self));
if (ec) {
self.complete(ec, 0);
@@ -244,9 +252,9 @@ struct exec_op {
conn->reqs_.pop_front();
if (conn->cmds_ == 0) {
conn->read_timer_.cancel();
conn->read_timer_.cancel_one();
if (!conn->reqs_.empty())
conn->writer_timer_.cancel();
conn->writer_timer_.cancel_one();
} else {
BOOST_ASSERT(!conn->reqs_.empty());
conn->reqs_.front()->timer.cancel_one();
@@ -260,13 +268,13 @@ struct exec_op {
template <class Conn>
struct ping_op {
Conn* conn;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void
operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t read_size = 0)
, std::size_t = 0)
{
reenter (coro) for (;;)
{
@@ -280,7 +288,7 @@ struct ping_op {
conn->req_.clear();
conn->req_.push("PING");
conn->req_.set_internal();
conn->req_.close_on_run_completion = true;
yield conn->async_exec(conn->req_, aedis::adapt(), std::move(self));
if (ec) {
// Notice we don't report error but let the idle check
@@ -295,7 +303,7 @@ struct ping_op {
template <class Conn>
struct check_idle_op {
Conn* conn;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
@@ -327,7 +335,7 @@ struct check_idle_op {
template <class Conn>
struct start_op {
Conn* conn;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
@@ -373,31 +381,84 @@ struct start_op {
};
template <class Conn>
struct run_op {
struct run_one_op {
Conn* conn;
boost::string_view host;
boost::string_view port;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
void operator()(
Self& self,
boost::system::error_code ec = {},
std::size_t = 0)
{
reenter (coro)
{
yield conn->async_resolve_with_timeout(host, port, std::move(self));
yield conn->async_resolve_with_timeout(std::move(self));
if (ec) {
conn->cancel_run();
self.complete(ec);
return;
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::resolve;
yield async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
}
conn->socket_ = std::make_shared<typename Conn::next_layer_type>(conn->resv_.get_executor());
yield conn->async_connect_with_timeout(std::move(self));
if (ec) {
conn->cancel_run();
self.complete(ec);
return;
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::connect;
yield async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
}
conn->req_.clear();
if (!std::empty(conn->cfg_.username) && !std::empty(conn->cfg_.password))
conn->req_.push("AUTH", conn->cfg_.username, conn->cfg_.password);
conn->req_.push("HELLO", "3");
conn->ping_timer_.expires_after(conn->cfg_.ping_interval);
yield
async_exec(
*conn->socket_,
conn->ping_timer_,
conn->req_,
adapter::adapt(),
conn->make_dynamic_buffer(),
std::move(self)
);
if (ec) {
conn->cancel_run();
self.complete(ec);
return;
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::hello;
yield async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
}
std::for_each(std::begin(conn->reqs_), std::end(conn->reqs_), [](auto const& ptr) {
return ptr->written = false;
});
@@ -408,10 +469,40 @@ struct run_op {
}
};
template <class Conn>
struct run_op {
Conn* conn;
boost::asio::coroutine coro{};
template <class Self>
void operator()(
Self& self,
boost::system::error_code ec = {},
std::size_t = 0)
{
reenter (coro) for(;;)
{
yield conn->async_run_one(std::move(self));
if (!conn->cfg_.enable_reconnect) {
self.complete(ec);
return;
}
// Consider communicating the return of async_run_one as an
// event here.
conn->ping_timer_.expires_after(conn->cfg_.reconnect_interval);
yield conn->ping_timer_.async_wait(std::move(self));
}
}
};
template <class Conn>
struct writer_op {
Conn* conn;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
@@ -459,7 +550,7 @@ struct writer_op {
template <class Conn>
struct reader_op {
Conn* conn;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
@@ -501,6 +592,7 @@ struct reader_op {
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push
|| conn->reqs_.empty()
|| (!conn->reqs_.empty() && conn->reqs_.front()->cmds == 0)) {
conn->last_event_ = Conn::event::push;
yield async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
self.complete(ec);
@@ -510,7 +602,7 @@ struct reader_op {
BOOST_ASSERT(conn->cmds_ != 0);
BOOST_ASSERT(!conn->reqs_.empty());
BOOST_ASSERT(conn->reqs_.front()->cmds != 0);
conn->reqs_.front()->timer.cancel();
conn->reqs_.front()->timer.cancel_one();
yield conn->read_timer_.async_wait(std::move(self));
if (!conn->socket_->is_open()) {
self.complete({});
@@ -525,11 +617,9 @@ struct reader_op {
template <class Conn, class Adapter>
struct runexec_op {
Conn* conn;
boost::string_view host;
boost::string_view port;
resp3::request const* req;
resp3::request const* req = nullptr;
Adapter adapter;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
@@ -540,20 +630,26 @@ struct runexec_op {
{
reenter (coro)
{
req->close_on_run_completion = true;
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return conn->async_run(host, port, token);},
[this](auto token) { return conn->async_run(token);},
[this](auto token) { return conn->async_exec(*req, adapter, token);}
).async_wait(
boost::asio::experimental::wait_for_one_error(),
std::move(self));
if (ec2) {
self.complete(ec2, n);
} else {
// If there was no error in the async_exec we complete
// with the async_run error, if any.
self.complete(ec1, n);
switch (order[0]) {
case 0:
{
self.complete(ec1, n);
} break;
case 1:
{
self.complete(ec2, n);
} break;
default: BOOST_ASSERT(false);
}
}
}

View File

@@ -18,6 +18,9 @@
namespace aedis {
namespace detail {
template <class Executor>
using conn_timer_t = boost::asio::basic_waitable_timer<std::chrono::steady_clock, boost::asio::wait_traits<std::chrono::steady_clock>, Executor>;
#include <boost/asio/yield.hpp>
template <
@@ -27,9 +30,9 @@ template <
>
struct connect_op {
boost::asio::basic_socket<Protocol, Executor>* socket;
boost::asio::steady_timer* timer;
conn_timer_t<Executor>* timer;
EndpointSequence* endpoints;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
@@ -77,12 +80,13 @@ struct connect_op {
}
};
template <class Resolver, class Timer>
struct resolve_op {
boost::asio::ip::tcp::resolver* resv;
boost::asio::steady_timer* timer;
Resolver* resv;
Timer* timer;
boost::string_view host;
boost::string_view port;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
@@ -129,12 +133,12 @@ struct resolve_op {
template <class Channel>
struct send_receive_op {
Channel* channel;
boost::asio::coroutine coro;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
, std::size_t = 0)
{
reenter (coro)
{
@@ -160,7 +164,7 @@ template <
>
auto async_connect(
boost::asio::basic_socket<Protocol, Executor>& socket,
boost::asio::steady_timer& timer,
conn_timer_t<Executor>& timer,
EndpointSequence ep,
CompletionToken&& token = boost::asio::default_completion_token_t<Executor>{})
{
@@ -172,20 +176,24 @@ auto async_connect(
}
template <
class Resolver,
class Timer,
class CompletionToken =
boost::asio::default_completion_token_t<boost::asio::ip::tcp::resolver::executor_type>
boost::asio::default_completion_token_t<typename Resolver::executor_type>
>
auto async_resolve(
boost::asio::ip::tcp::resolver& resv,
boost::asio::steady_timer& timer,
Resolver& resv,
Timer& timer,
boost::string_view host,
boost::string_view port,
CompletionToken&& token = CompletionToken{})
{
// TODO: Use static_assert to check Resolver::executor_type and
// Timer::executor_type are same.
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, boost::asio::ip::tcp::resolver::results_type)
>(resolve_op{&resv, &timer, host, port}, token, resv, timer);
>(resolve_op<Resolver, Timer>{&resv, &timer, host, port}, token, resv, timer);
}
template <

View File

@@ -0,0 +1,151 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_EXPERIMENTAL_SYNC_HPP
#define AEDIS_EXPERIMENTAL_SYNC_HPP
#include <aedis/adapt.hpp>
#include <aedis/connection.hpp>
#include <aedis/resp3/request.hpp>
namespace aedis {
namespace experimental {
namespace detail {
struct sync {
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
};
} // detail
/** @brief Executes a command.
* @ingroup any
*
* This function will block until execution completes.
*
* @param conn The connection.
* @param req The request.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class Connection, class ResponseAdapter>
std::size_t
exec(
Connection& conn,
resp3::request const& req,
ResponseAdapter adapter,
boost::system::error_code& ec)
{
detail::sync sh;
std::size_t res = 0;
auto f = [&conn, &ec, &res, &sh, &req, adapter]()
{
conn.async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Executes a command.
* @ingroup any
*
* This function will block until execution completes.
*
* @param conn The connection.
* @param req The request.
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <
class Connection,
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
std::size_t
exec(
Connection& conn,
resp3::request const& req,
ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = exec(conn, req, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives server pushes synchronusly.
* @ingroup any
*
* This function will block until execution completes.
*
* @param conn The connection.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class Connection, class ResponseAdapter>
auto receive_event(
Connection& conn,
ResponseAdapter adapter,
boost::system::error_code& ec)
{
using event_type = typename Connection::event;
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
event_type ev = event_type::invalid;
auto f = [&conn, &ec, &ev, &mutex, &cv, &ready, adapter]()
{
conn.async_receive_event(adapter, [&cv, &mutex, &ready, &ev, &ec](auto const& ecp, event_type evp) {
std::unique_lock ul(mutex);
ec = ecp;
ev = evp;
ready = true;
ul.unlock();
cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
std::unique_lock lk(mutex);
cv.wait(lk, [&ready]{return ready;});
return ev;
}
/// TODO
template <
class Connection,
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
auto receive_event(
Connection& conn,
ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = receive_event(conn, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
} // experimental
} // aedis
#endif // AEDIS_EXPERIMENTAL_SYNC_HPP

View File

@@ -34,6 +34,9 @@ constexpr char separator[] = "\r\n";
* }
* @endcode
*
* @param to Storage on which data will be copied into.
* @param data Data that will be serialized and stored in @c to.
*
* See more in \ref requests-serialization.
*/
template <class Request>

View File

@@ -0,0 +1,184 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_RESP3_EXEC_HPP
#define AEDIS_RESP3_EXEC_HPP
#include <boost/assert.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/experimental/parallel_group.hpp>
#include <aedis/error.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/request.hpp>
namespace aedis {
namespace resp3 {
namespace detail {
#include <boost/asio/yield.hpp>
template <
class AsyncStream,
class Adapter,
class DynamicBuffer
>
struct exec_op {
AsyncStream* socket = nullptr;
request const* req = nullptr;
Adapter adapter;
DynamicBuffer dbuf{};
std::size_t n_cmds = 0;
std::size_t size = 0;
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro) for (;;)
{
if (req) {
yield
boost::asio::async_write(
*socket,
boost::asio::buffer(req->payload()),
std::move(self));
if (ec || n_cmds == 0) {
self.complete(ec, n);
return;
}
req = nullptr;
}
yield resp3::async_read(*socket, dbuf, adapter, std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
size += n;
if (--n_cmds == 0) {
self.complete(ec, size);
return;
}
}
}
};
template <
class AsyncStream,
class Adapter,
class DynamicBuffer,
class CompletionToken = boost::asio::default_completion_token_t<typename AsyncStream::executor_type>
>
auto async_exec(
AsyncStream& socket,
request const& req,
Adapter adapter,
DynamicBuffer dbuf,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<AsyncStream, Adapter, DynamicBuffer>
{&socket, &req, adapter, dbuf, req.size()}, token, socket);
}
template <
class AsyncStream,
class Timer,
class Adapter,
class DynamicBuffer
>
struct exec_with_timeout_op {
AsyncStream* socket = nullptr;
Timer* timer = nullptr;
request const* req = nullptr;
Adapter adapter;
DynamicBuffer dbuf{};
boost::asio::coroutine coro{};
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, std::size_t n = 0
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return detail::async_exec(*socket, *req, adapter, dbuf, token);},
[this](auto token) { return timer->async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, 0);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(aedis::error::idle_timeout, 0);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, n);
}
}
};
#include <boost/asio/unyield.hpp>
template <
class AsyncStream,
class Timer,
class Adapter,
class DynamicBuffer,
class CompletionToken = boost::asio::default_completion_token_t<typename AsyncStream::executor_type>
>
auto async_exec(
AsyncStream& socket,
Timer& timer,
request const& req,
Adapter adapter,
DynamicBuffer dbuf,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_with_timeout_op<AsyncStream, Timer, Adapter, DynamicBuffer>
{&socket, &timer, &req, adapter, dbuf}, token, socket, timer);
}
} // detail
} // resp3
} // aedis
#endif // AEDIS_RESP3_EXEC_HPP

View File

@@ -37,7 +37,7 @@ private:
parser<ResponseAdapter> parser_;
std::size_t consumed_;
std::size_t buffer_size_;
boost::asio::coroutine coro_;
boost::asio::coroutine coro_{};
public:
parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter)

View File

@@ -4,9 +4,11 @@
* accompanying file LICENSE.txt)
*/
#include <aedis/command.hpp>
#include <aedis/resp3/request.hpp>
namespace aedis {
namespace resp3 {
namespace detail {
bool has_push_response(boost::string_view cmd)
{
@@ -16,4 +18,6 @@ bool has_push_response(boost::string_view cmd)
return false;
}
} // detail
} // resp3
} // aedis

View File

@@ -62,6 +62,9 @@ std::string to_string(node<String> const& in)
/** \brief Compares a node for equality.
* \ingroup any
*
* @param a Left hand side node object.
* @param b Right hand side node object.
*/
template <class String>
bool operator==(node<String> const& a, node<String> const& b)
@@ -75,12 +78,15 @@ bool operator==(node<String> const& a, node<String> const& b)
/** \brief Writes the node string to the stream.
* \ingroup any
*
* NOTE: Binary data is not converted to text.
* @param os Output stream.
* @param node Node object.
*
* \remark Binary data is not converted to text.
*/
template <class String>
std::ostream& operator<<(std::ostream& os, node<String> const& o)
std::ostream& operator<<(std::ostream& os, node<String> const& node)
{
os << to_string(o);
os << to_string(node);
return os;
}

View File

@@ -7,8 +7,9 @@
#ifndef AEDIS_RESP3_REQUEST_HPP
#define AEDIS_RESP3_REQUEST_HPP
#include <boost/hana.hpp>
#include <aedis/resp3/compose.hpp>
#include <boost/hana.hpp>
#include <boost/utility/string_view.hpp>
// NOTE: Consider detecting tuples in the type in the parameter pack
// to calculate the header size correctly.
@@ -18,15 +19,18 @@
namespace aedis {
namespace resp3 {
namespace detail {
/** @brief Creates Redis requests from user data.
bool has_push_response(boost::string_view cmd);
} // detail
/** @brief Creates Redis requests.
* \ingroup any
*
* A request is composed of one or more Redis commands and is
* referred to in the redis documentation as a pipeline, see
* https://redis.io/topics/pipelining.
*
* Example
* https://redis.io/topics/pipelining. For example
*
* @code
* request r;
@@ -44,9 +48,9 @@ namespace resp3 {
class request {
public:
//// Returns the number of commands contained in this request.
std::size_t commands() const noexcept { return commands_;};
std::size_t size() const noexcept { return commands_;};
/// Returns the request payload.
// Returns the request payload.
auto const& payload() const noexcept { return payload_;}
/// Clears the request preserving allocated memory.
@@ -78,14 +82,12 @@ public:
using boost::hana::make_tuple;
using resp3::type;
auto const before = payload_.size();
auto constexpr pack_size = sizeof...(Ts);
resp3::add_header(payload_, type::array, 1 + pack_size);
resp3::add_bulk(payload_, cmd);
resp3::add_bulk(payload_, make_tuple(args...));
auto const after = payload_.size();
if (!has_push_response(cmd))
if (!detail::has_push_response(cmd))
++commands_;
}
@@ -119,8 +121,6 @@ public:
if (begin == end)
return;
auto const before = payload_.size();
auto constexpr size = resp3::bulk_counter<value_type>::size;
auto const distance = std::distance(begin, end);
resp3::add_header(payload_, type::array, 2 + size * distance);
@@ -130,8 +130,7 @@ public:
for (; begin != end; ++begin)
resp3::add_bulk(payload_, *begin);
auto const after = payload_.size();
if (!has_push_response(cmd))
if (!detail::has_push_response(cmd))
++commands_;
}
@@ -161,7 +160,6 @@ public:
if (begin == end)
return;
auto const before = payload_.size();
auto constexpr size = resp3::bulk_counter<value_type>::size;
auto const distance = std::distance(begin, end);
resp3::add_header(payload_, type::array, 1 + size * distance);
@@ -170,8 +168,7 @@ public:
for (; begin != end; ++begin)
resp3::add_bulk(payload_, *begin);
auto const after = payload_.size();
if (!has_push_response(cmd))
if (!detail::has_push_response(cmd))
++commands_;
}
@@ -206,13 +203,11 @@ public:
push_range2(cmd, begin(range), end(range));
}
void set_internal() noexcept { is_internal_ = true;}
bool is_internal() const noexcept { return is_internal_;}
mutable bool close_on_run_completion = false;
private:
std::string payload_;
std::size_t commands_ = 0;
bool is_internal_ = true;
};
} // resp3

View File

@@ -5,6 +5,6 @@
*/
#include <aedis/impl/error.ipp>
#include <aedis/impl/command.ipp>
#include <aedis/resp3/impl/request.ipp>
#include <aedis/resp3/impl/type.ipp>
#include <aedis/resp3/detail/impl/parser.ipp>

View File

@@ -7,6 +7,17 @@
#include <iostream>
#include <stdlib.h>
void expect_true(bool a, std::string const& msg = "")
{
if (a) {
if (!msg.empty())
std::cout << "Success: " << msg << std::endl;
} else {
std::cout << "Error: " << msg << std::endl;
exit(EXIT_FAILURE);
}
}
template <class T>
void expect_eq(T const& a, T const& b, std::string const& msg = "")
{

422
tests/connection.cpp Normal file
View File

@@ -0,0 +1,422 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
// TODO: Avoid usage of co_await to improve tests is compilers that
// don't support it.
// TODO: Add reconnect test that kills the server and waits some
// seconds.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "check.hpp"
namespace net = boost::asio;
using aedis::resp3::request;
using connection = aedis::connection<>;
using error_code = boost::system::error_code;
using net::experimental::as_tuple;
bool is_host_not_found(boost::system::error_code ec)
{
if (ec == net::error::netdb_errors::host_not_found) return true;
if (ec == net::error::netdb_errors::host_not_found_try_again) return true;
return false;
}
//----------------------------------------------------------------
// Tests whether resolve fails with the correct error.
void test_resolve()
{
connection::config cfg;
cfg.host = "Atibaia";
cfg.port = "6379";
cfg.resolve_timeout = std::chrono::seconds{100};
net::io_context ioc;
connection db{ioc, cfg};
db.async_run([](auto ec) {
expect_true(is_host_not_found(ec), "test_resolve");
});
ioc.run();
}
//----------------------------------------------------------------
void test_connect()
{
connection::config cfg;
cfg.host = "127.0.0.1";
cfg.port = "1";
cfg.connect_timeout = std::chrono::seconds{100};
net::io_context ioc;
connection db{ioc, cfg};
db.async_run([](auto ec) {
expect_error(ec, net::error::basic_errors::connection_refused, "test_connect");
});
ioc.run();
}
//----------------------------------------------------------------
// Test if quit causes async_run to exit.
void test_quit1(connection::config const& cfg)
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_quit1");
});
db->async_run([](auto ec){
expect_error(ec, net::error::misc_errors::eof, "test_quit1");
});
ioc.run();
}
void test_quit2(connection::config const& cfg)
{
std::cout << "test_quit2" << std::endl;
request req;
req.push("QUIT");
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_quit2");
});
ioc.run();
}
void test_quit()
{
connection::config cfg;
cfg.coalesce_requests = true;
test_quit1(cfg);
cfg.coalesce_requests = false;
test_quit1(cfg);
cfg.coalesce_requests = true;
test_quit2(cfg);
cfg.coalesce_requests = false;
test_quit2(cfg);
}
// Checks whether we get idle timeout when no push reader is set.
void test_missing_push_reader1(connection::config const& cfg)
{
std::cout << "test_missing_push_reader1" << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("SUBSCRIBE", "channel");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_missing_push_reader1");
});
ioc.run();
}
void test_missing_push_reader2(connection::config const& cfg)
{
std::cout << "test_missing_push_reader2" << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req; // Wrong command syntax.
req.push("SUBSCRIBE");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_missing_push_reader2");
});
ioc.run();
}
void test_missing_push_reader3(connection::config const& cfg)
{
std::cout << "test_missing_push_reader3" << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req; // Wrong command synthax.
req.push("PING", "Message");
req.push("SUBSCRIBE");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_missing_push_reader3");
});
ioc.run();
}
void test_idle()
{
std::cout << "test_idle" << std::endl;
connection::config cfg;
cfg.resolve_timeout = std::chrono::seconds{1};
cfg.connect_timeout = std::chrono::seconds{1};
cfg.ping_interval = std::chrono::seconds{1};
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("CLIENT", "PAUSE", 5000);
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_idle");
});
db->async_run([](auto ec){
expect_error(ec, aedis::error::idle_timeout, "test_idle");
});
ioc.run();
}
#ifdef BOOST_ASIO_HAS_CO_AWAIT
net::awaitable<void>
push_consumer1(std::shared_ptr<connection> db, bool& received, char const* msg)
{
{
auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable));
expect_no_error(ec, msg);
received = true;
}
{
auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable));
expect_error(ec, boost::asio::experimental::channel_errc::channel_cancelled, msg);
}
}
void test_push_is_received1(connection::config const& cfg)
{
std::cout << "test_push_is_received1" << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("SUBSCRIBE", "channel");
req.push("QUIT");
db->async_run(req, aedis::adapt(), [db](auto ec, auto){
expect_no_error(ec, "test_push_is_received1");
db->cancel_event_receiver();
});
bool received = false;
net::co_spawn(
ioc.get_executor(),
push_consumer1(db, received, "test_push_is_received1"),
net::detached);
ioc.run();
expect_true(received);
}
void test_push_is_received2(connection::config const& cfg)
{
std::cout << "test_push_is_received2" << std::endl;
request req1;
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("PING", "Message2");
req3.push("QUIT");
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
auto handler =[](auto ec, auto...)
{ expect_no_error(ec, "test_push_is_received2"); };
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req3, aedis::adapt(), handler);
db->async_run([db](auto ec, auto...) {
expect_error(ec, net::error::misc_errors::eof, "test_push_is_received2");
db->cancel_event_receiver();
});
bool received = false;
net::co_spawn(
ioc.get_executor(),
push_consumer1(db, received, "test_push_is_received2"),
net::detached);
ioc.run();
expect_true(received);
}
net::awaitable<void> test_reconnect_impl(std::shared_ptr<connection> db)
{
request req;
req.push("QUIT");
for (auto i = 0;;) {
auto ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::resolve, "test_reconnect.");
ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::connect, "test_reconnect.");
ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::hello, "test_reconnect.");
co_await db->async_exec(req, aedis::adapt(), net::use_awaitable);
// Test 5 reconnetions and returns.
++i;
if (i == 5) {
db->get_config().enable_reconnect = false;
co_return;
}
}
co_return;
}
// Test whether the client works after a reconnect.
void test_reconnect()
{
std::cout << "Start: test_reconnect" << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
db->get_config().enable_events = true;
db->get_config().enable_reconnect = true;
db->get_config().reconnect_interval = std::chrono::milliseconds{100};
net::co_spawn(ioc, test_reconnect_impl(db), net::detached);
db->async_run([](auto ec) {
expect_error(ec, net::error::misc_errors::eof, "test_reconnect.");
});
ioc.run();
std::cout << "End: test_reconnect()" << std::endl;
}
net::awaitable<void>
push_consumer3(std::shared_ptr<connection> db)
{
for (;;)
co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
}
// Test many subscribe requests.
void test_push_many_subscribes(connection::config const& cfg)
{
std::cout << "test_push_many_subscribes" << std::endl;
request req0;
req0.push("HELLO", 3);
request req1;
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("QUIT");
auto handler =[](auto ec, auto...)
{
expect_no_error(ec, "test_push_many_subscribes");
};
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
db->async_exec(req0, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req3, aedis::adapt(), handler);
db->async_run([db](auto ec, auto...) {
expect_error(ec, net::error::misc_errors::eof, "test_push_many_subscribes");
db->cancel_event_receiver();
});
net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);
ioc.run();
}
#endif
void test_push()
{
connection::config cfg;
cfg.coalesce_requests = true;
#ifdef BOOST_ASIO_HAS_CO_AWAIT
test_push_is_received1(cfg);
test_push_is_received2(cfg);
test_push_many_subscribes(cfg);
#endif
test_missing_push_reader1(cfg);
test_missing_push_reader3(cfg);
cfg.coalesce_requests = false;
#ifdef BOOST_ASIO_HAS_CO_AWAIT
test_push_is_received1(cfg);
test_push_is_received2(cfg);
test_push_many_subscribes(cfg);
#endif
test_missing_push_reader2(cfg);
test_missing_push_reader3(cfg);
}
int main()
{
test_resolve();
test_connect();
test_quit();
test_push();
#ifdef BOOST_ASIO_HAS_CO_AWAIT
test_reconnect();
#endif
// Must come last as it sends a client pause.
test_idle();
}

View File

@@ -1,391 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <map>
//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
#include <boost/asio.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
#include "check.hpp"
//std::cout << "aaaa " << ec.message() << " " << cmd << " " << n << std::endl;
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::resp3::request;
using connection = aedis::connection<>;
using error_code = boost::system::error_code;
using net::experimental::as_tuple;
using tcp = net::ip::tcp;
using boost::system::error_code;
auto print_read = [](auto cmd, auto n)
{
std::cout << cmd << ": " << n << std::endl;
};
//----------------------------------------------------------------
void test_resolve()
{
connection::config cfg;
cfg.resolve_timeout = std::chrono::seconds{100};
net::io_context ioc;
connection db{ioc, cfg};
db.async_run("Atibaia", "6379", [](auto ec) {
expect_error(ec, net::error::netdb_errors::host_not_found, "test_resolve");
});
ioc.run();
}
//----------------------------------------------------------------
void test_connect()
{
connection::config cfg;
cfg.connect_timeout = std::chrono::seconds{100};
net::io_context ioc;
connection db{ioc, cfg};
db.async_run("127.0.0.1", "1", [](auto ec) {
expect_error(ec, net::error::basic_errors::connection_refused, "test_connect");
});
ioc.run();
}
//----------------------------------------------------------------
// Test if quit causes async_run to exit.
void test_quit1()
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
request req;
req.push("HELLO", 3);
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto r){
expect_no_error(ec, "test_quit1");
});
db->async_run("127.0.0.1", "6379", [](auto ec){
expect_error(ec, net::error::misc_errors::eof, "test_quit1");
});
ioc.run();
}
void test_quit2()
{
request req;
req.push("HELLO", 3);
req.push("QUIT");
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto n){
expect_error(ec, net::error::misc_errors::eof, "test_quit2");
});
ioc.run();
}
//----------------------------------------------------------------
net::awaitable<void>
push_consumer1(std::shared_ptr<connection> db)
{
{
auto [ec, n] = co_await db->async_read_push(aedis::adapt(), as_tuple(net::use_awaitable));
expect_no_error(ec, "push_consumer1");
}
{
auto [ec, n] = co_await db->async_read_push(aedis::adapt(), as_tuple(net::use_awaitable));
expect_error(ec, boost::asio::experimental::channel_errc::channel_cancelled, "push_consumer1");
}
}
// Tests whether a push is indeed delivered.
void test_push1()
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
req.push("QUIT");
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, net::error::misc_errors::eof, "test_push1");
});
net::co_spawn(ioc.get_executor(), push_consumer1(db), net::detached);
ioc.run();
}
//----------------------------------------------------------------
net::awaitable<void> run5(std::shared_ptr<connection> db)
{
{
request req;
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto n){
expect_no_error(ec, "test_quit1");
});
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof, "run5a");
}
{
request req;
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto n){
expect_no_error(ec, "test_quit1");
});
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof, "run5a");
}
co_return;
}
// Test whether the client works after a reconnect.
void test_reconnect()
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
net::co_spawn(ioc, run5(db), net::detached);
ioc.run();
std::cout << "Success: test_reconnect()" << std::endl;
}
// Checks whether we get idle timeout when no push reader is set.
void test_no_push_reader1()
{
connection::config cfg;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader1");
});
ioc.run();
}
void test_no_push_reader2()
{
connection::config cfg;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req; // Wrong command syntax.
req.push("HELLO", 3);
req.push("SUBSCRIBE");
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader2");
});
ioc.run();
}
void test_no_push_reader3()
{
connection::config cfg;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req; // Wrong command synthax.
req.push("HELLO", 3);
req.push("PING", "Message");
req.push("SUBSCRIBE");
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader3");
});
ioc.run();
}
void test_idle()
{
connection::config cfg;
cfg.resolve_timeout = std::chrono::seconds{1};
cfg.connect_timeout = std::chrono::seconds{1};
cfg.ping_interval = std::chrono::seconds{1};
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
request req;
req.push("HELLO", 3);
req.push("CLIENT", "PAUSE", 5000);
db->async_exec(req, aedis::adapt(), [](auto ec, auto r){
expect_no_error(ec, "test_idle");
});
db->async_run("127.0.0.1", "6379", [](auto ec){
expect_error(ec, aedis::error::idle_timeout, "test_idle");
});
ioc.run();
}
auto handler =[](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
void test_push2()
{
request req1;
req1.push("HELLO", 3);
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("PING", "Message2");
req3.push("QUIT");
std::tuple<std::string, std::string> resp;
net::io_context ioc;
connection db{ioc};
db.async_exec(req1, aedis::adapt(resp), handler);
db.async_exec(req2, aedis::adapt(resp), handler);
db.async_exec(req3, aedis::adapt(resp), handler);
db.async_run("127.0.0.1", "6379", [&db](auto ec, auto...) {
std::cout << ec.message() << std::endl;
db.cancel_requests();
});
ioc.run();
}
net::awaitable<void>
push_consumer3(std::shared_ptr<connection> db)
{
for (;;)
co_await db->async_read_push(aedis::adapt(), net::use_awaitable);
}
void test_push3()
{
request req1;
req1.push("HELLO", 3);
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("QUIT");
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req3, aedis::adapt(), handler);
db->async_run("127.0.0.1", "6379", [db](auto ec, auto...) {
db->cancel_requests();
});
net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);
ioc.run();
}
void test_exec_while_processing()
{
request req1;
req1.push("HELLO", 3);
req1.push("PING", "Message1");
request req2;
req2.push("SUBSCRIBE", "channel");
request req3;
req3.push("QUIT");
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->async_exec(req1, aedis::adapt(), [db, &req1](auto ec, auto) {
db->async_exec(req1, aedis::adapt(), handler);
});
db->async_exec(req1, aedis::adapt(), [db, &req2](auto ec, auto) {
db->async_exec(req2, aedis::adapt(), handler);
});
db->async_exec(req2, aedis::adapt(), [db, &req2](auto ec, auto) {
db->async_exec(req2, aedis::adapt(), handler);
});
db->async_exec(req1, aedis::adapt(), [db, &req1](auto ec, auto) {
db->async_exec(req1, aedis::adapt(), handler);
});
db->async_exec(req2, aedis::adapt(), [db, &req3](auto ec, auto) {
db->async_exec(req3, aedis::adapt(), handler);
});
db->async_run("127.0.0.1", "6379", [db](auto ec, auto...) {
db->cancel_requests();
});
net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);
ioc.run();
std::cout << "Success: test_exec_while_processing()" << std::endl;
}
int main()
{
test_resolve();
test_connect();
test_quit1();
test_quit2();
test_push1();
test_push2();
test_push3();
test_no_push_reader1();
test_no_push_reader2();
test_no_push_reader3();
test_reconnect();
test_exec_while_processing();
// Must come last as it send a client pause.
test_idle();
}

View File

@@ -16,7 +16,7 @@
#include <boost/asio/co_spawn.hpp>
#include <boost/beast/_experimental/test/stream.hpp>
#include <aedis/aedis.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "check.hpp"
@@ -36,7 +36,7 @@ struct expect {
std::string in;
Result expected;
std::string name;
boost::system::error_code ec;
boost::system::error_code ec{};
};
template <class Result>
@@ -74,7 +74,7 @@ public:
void run()
{
auto self = this->shared_from_this();
auto f = [self](auto ec, auto n)
auto f = [self](auto ec, auto)
{
expect_error(ec, self->data_.ec);
if (self->data_.ec)

View File

@@ -9,7 +9,7 @@
#include <boost/asio/connect.hpp>
#include <aedis/aedis.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
namespace net = boost::asio;