2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-25 06:32:08 +00:00

Compare commits

...

14 Commits

Author SHA1 Message Date
Marcelo Zimbres
11807c82b7 Improves documentations of the connection class. 2022-08-21 12:50:29 +02:00
Marcelo Zimbres
24a215d78b First steps with cmake support. 2022-08-20 22:12:33 +02:00
Marcelo Zimbres
b7abe20703 CI fix. 2022-08-20 12:18:59 +02:00
Marcelo Zimbres
225095944c Commit of the following:
- Adds sync class the offer a thread-safe and synchronous API.
- Fixes documentation of adapt functions.
- Removes compose.hpp header.
- Adds test to aedis::error and resp3::type.
- Simplifies some code.
2022-08-20 11:56:31 +02:00
Marcelo Zimbres
a31d797e43 Moves sync functions from experimental to connection and improves code coverage. 2022-08-18 22:17:33 +02:00
Marcelo Zimbres
cca8d5d6dc Improvements in the examples, docs, sync functions and coverage. 2022-08-17 22:30:59 +02:00
Marcelo Zimbres
6c5bee6920 Fixes bug in the context of reconnecting and events. 2022-08-16 22:18:27 +02:00
Marcelo Zimbres
c4714d0037 Splits async_receive_event in two functions. 2022-08-15 22:45:55 +02:00
Marcelo Zimbres
38bf2395af Fix coverage and ports tests to boost.test. 2022-08-14 21:46:56 +02:00
Marcelo Zimbres
7511d6b4d8 Progress porting to boost.test. 2022-08-13 22:52:42 +02:00
Marcelo Zimbres
ddc2815fe5 Progress with coverage report. 2022-08-13 17:00:18 +02:00
Marcelo Zimbres
de6f5de655 Adds coverage file. 2022-08-12 22:43:37 +02:00
Marcelo Zimbres
8d454ada0e Simplifies the cancellation of some connection async_ functions. 2022-08-12 21:53:04 +02:00
Marcelo Zimbres
ebac88f2ca Improvementes in the CI script. 2022-08-07 18:44:28 +02:00
39 changed files with 1429 additions and 1112 deletions

13
.codecov.yml Normal file
View File

@@ -0,0 +1,13 @@
codecov:
max_report_age: off
require_ci_to_pass: yes
notify:
after_n_builds: 1
wait_for_ci: yes
ignore:
- "benchmarks/cpp/asio/*"
- "examples/*"
- "tests/*"
- "/usr/*"
- "**/boost/*"

View File

@@ -1,30 +1,35 @@
name: CI
on:
push:
branches: [ master ]
on: [push, pull_request]
jobs:
build:
posix:
defaults:
run:
shell: bash
strategy:
fail-fast: false
matrix:
build-type: ['sanity']
runs-on: [ubuntu-22.04]
compiler: [g++-11, clang++-11, clang++-13]
cxx-std: ['c++20']
optim-level: ['-O0']
runs-on: ${{ matrix.runs-on }}
include:
- { toolset: gcc, compiler: g++-10, install: g++-10, os: ubuntu-22.04, cxxstd: 'c++17' }
- { toolset: gcc, compiler: g++-11, install: g++-11, os: ubuntu-22.04, cxxstd: 'c++17' }
- { toolset: gcc, compiler: g++-11, install: g++-11, os: ubuntu-22.04, cxxstd: 'c++20' }
- { toolset: clang, compiler: clang++-11, install: clang-11, os: ubuntu-22.04, cxxstd: 'c++17' }
- { toolset: clang, compiler: clang++-11, install: clang-11, os: ubuntu-22.04, cxxstd: 'c++20' }
- { toolset: clang, compiler: clang++-13, install: clang-13, os: ubuntu-22.04, cxxstd: 'c++17' }
- { toolset: clang, compiler: clang++-13, install: clang-13, os: ubuntu-22.04, cxxstd: 'c++20' }
runs-on: ${{ matrix.os }}
env:
CXX: ${{ matrix.compiler }}
CXXFLAGS: -std=${{ matrix.cxx-std }} ${{ matrix.optim-level }} -Wall -Wextra
CXXFLAGS: -std=${{matrix.cxxstd}} -Wall -Wextra
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Autotools
run: sudo apt install automake
- name: Install compiler
run: sudo apt-get install -y ${{ matrix.compiler }}
run: sudo apt-get install -y ${{ matrix.install }}
- name: Install Redis
run: sudo apt-get install -y redis-server
- name: Install boost

50
.github/workflows/coverage.yml vendored Normal file
View File

@@ -0,0 +1,50 @@
name: Coverage
on:
push:
branches:
- master
jobs:
posix:
defaults:
run:
shell: bash
runs-on: ubuntu-22.04
env:
CXX: g++-11
CXXFLAGS: -std=c++20 -Wall -Wextra --coverage -fkeep-inline-functions -fkeep-static-functions -O0
LDFLAGS: --coverage
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Autotools
run: sudo apt install automake
- name: Install compiler
run: sudo apt-get install -y g++-11
- name: Install Redis
run: sudo apt-get install -y redis-server
- name: Install boost
uses: MarkusJx/install-boost@v2.3.0
id: install-boost
with:
boost_version: 1.79.0
platform_version: 22.04
- name: Configure
run: |
autoreconf -i
./configure --with-boost=${{ steps.install-boost.outputs.BOOST_ROOT }}
- name: Build
run: make
- name: Check
run: make check VERBOSE=1
# - name: Generate coverage report
# run: |
# lcov --base-directory . --directory tests/ --output-file aedis.info --capture
# lcov --remove aedis.info '/usr/*' "${{ steps.install-boost.outputs.BOOST_ROOT }}/include/boost/*" --output-file aedis.info
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
gcov: true
working_directory: ${{ env.GITHUB_WORKSPACE }}

View File

@@ -1,5 +1,31 @@
# Changelog
## v1.0.0
* Adds experimental cmake support for windows users.
* Adds new class `aedis::sync` that wraps an `aedis::connection` in
a thread-safe and synchronous API. All free functions from the
`sync.hpp` are now member functions of `aedis::sync`.
* Split `aedis::connection::async_receive_event` in two functions, one
to receive events and another for server side pushes, see
`aedis::connection::async_receive_push`.
* Removes collision between `aedis::adapter::adapt` and
`aedis::adapt`.
* Adds `connection::operation` enum to replace `cancel_*` member
functions with a single cancel function that gets the operations
that should be cancelled as argument.
* Bugfix: a bug on reconnect from a state where the `connection` object
had unsent commands. It could cause `async_exec` to never
complete under certain conditions.
* Bugfix: Documentation of `adapt()` functions were missing from
doxygen.
## v0.3.0
* Adds `experimental::exec` and `receive_event` functions to offer a

View File

@@ -1,2 +1,50 @@
# This is ongoing work. At the moment autotools is still the supported
# build system.
# At the moment the official build system is still autotools and this
# file is meant to support Aedis on windows.
cmake_minimum_required(VERSION 3.14)
project(
Aedis
VERSION 1.0.0
DESCRIPTION "An async redis client designed for performance and scalability"
HOMEPAGE_URL "https://mzimbres.github.io/aedis"
LANGUAGES CXX
)
add_library(aedis INTERFACE)
target_include_directories(aedis INTERFACE include)
find_package(Boost 1.79 REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})
enable_testing()
include_directories(include)
add_executable(chat_room examples/chat_room.cpp)
add_executable(containers examples/containers.cpp)
add_executable(echo_server examples/echo_server.cpp)
add_executable(intro examples/intro.cpp)
add_executable(intro_sync examples/intro_sync.cpp)
add_executable(serialization examples/serialization.cpp)
add_executable(subscriber examples/subscriber.cpp)
add_executable(subscriber_sync examples/subscriber_sync.cpp)
add_executable(test_low_level tests/low_level.cpp)
add_executable(test_connection tests/connection.cpp)
add_executable(low_level_sync tests/low_level_sync.cpp)
add_test(containers containers)
add_test(intro intro)
add_test(intro_sync intro_sync)
add_test(serialization serialization)
add_test(test_low_level test_low_level)
add_test(test_connection test_connection)
add_test(low_level_sync low_level_sync)
include(GNUInstallDirs)
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/include/boost
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
FILES_MATCHING
PATTERN "*.hpp"
PATTERN "*.ipp"
)

View File

@@ -1 +0,0 @@
See https://mzimbres.github.io/aedis/#using-aedis

View File

@@ -54,9 +54,7 @@ echo_server_direct_SOURCES = $(top_srcdir)/benchmarks/cpp/asio/echo_server_direc
echo_server_client_SOURCES = $(top_srcdir)/benchmarks/cpp/asio/echo_server_client.cpp
endif
nobase_noinst_HEADERS =\
$(top_srcdir)/examples/print.hpp\
$(top_srcdir)/tests/check.hpp
nobase_noinst_HEADERS = $(top_srcdir)/examples/print.hpp
TESTS = $(check_PROGRAMS)
@@ -79,12 +77,20 @@ EXTRA_DIST += $(top_srcdir)/benchmarks/rust/echo_server_direct/Cargo.toml
EXTRA_DIST += $(top_srcdir)/benchmarks/rust/echo_server_direct/src/main.rs
EXTRA_DIST += $(top_srcdir)/benchmarks/rust/echo_server_over_redis/Cargo.toml
EXTRA_DIST += $(top_srcdir)/benchmarks/rust/echo_server_over_redis/src/main.rs
EXTRA_DIST += $(top_srcdir)/CMakeLists.txt
.PHONY: doc
doc:
doxygen doc/Doxyfile
.PHONY: coverage
coverage:
lcov --base-directory . --directory tests/ --output-file aedis.info --capture
lcov --remove aedis.info '/usr/*' '/opt/boost_1_79_0/include/boost/*' --output-file aedis.info
genhtml --output-directory html aedis.info
.PHONY: bench
bench:
pdflatex --jobname=echo-f0 benchmarks/benchmarks.tex
pdflatex --jobname=echo-f1 benchmarks/benchmarks.tex

View File

@@ -1,16 +1,20 @@
Branch | GH Actions | codecov.io |
:-------------: | ---------- | ---------- |
[`master`](https://github.com/mzimbres/aedis/tree/master) | [![CI](https://github.com/mzimbres/aedis/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/mzimbres/aedis/actions/workflows/ci.yml) | [![codecov](https://codecov.io/gh/mzimbres/aedis/branch/master/graph/badge.svg)](https://codecov.io/gh/mzimbres/aedis/branch/master)
## Aedis
An async redis client designed for performance and scalability
### License
Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt).
### Build Status
Branch | GH Actions |
:-------------: | ---------- |
[`master`](https://github.com/mzimbres/aedis/tree/master) | [![CI](https://github.com/mzimbres/aedis/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/mzimbres/aedis/actions/workflows/ci.yml)
### More information
* See the official github-pages for documentation: https://mzimbres.github.io/aedis
### Installation
See https://mzimbres.github.io/aedis/#using-aedis

View File

@@ -85,9 +85,3 @@ The code used in the benchmarks can be found at
## Running the benchmarks
Run one of the echo-server programs in one terminal and the [echo-server-client](https://github.com/mzimbres/aedis/blob/42880e788bec6020dd018194075a211ad9f339e8/benchmarks/cpp/asio/echo_server_client.cpp) in another.
## Contributing
If your spot any performance improvement in any of the example or
would like to include other clients, please open a PR and I will
gladly merge it.

View File

@@ -1,5 +1,5 @@
AC_PREREQ([2.69])
AC_INIT([Aedis], [0.3.0], [mzimbres@gmail.com])
AC_INIT([Aedis], [1.0.0], [mzimbres@gmail.com])
AC_CONFIG_MACRO_DIR([m4])
AC_CONFIG_HEADERS([config.h])
AC_CONFIG_SRCDIR(include/aedis.hpp)
@@ -19,10 +19,6 @@ AC_CHECK_HEADER_STDBOOL
AC_TYPE_UINT64_T
AC_CHECK_TYPES([ptrdiff_t])
AX_CXX_COMPILE_STDCXX(17, , mandatory)
AX_CXX_COMPILE_STDCXX(20, , optional)
AM_CONDITIONAL(HAVE_CXX20,[test x$HAVE_CXX20 == x1])
# This check has been stolen from Asio
AC_MSG_CHECKING([whether coroutines are enabled])
AC_COMPILE_IFELSE(

View File

@@ -17,47 +17,49 @@
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using aedis::resp3::node;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using stream_descriptor = net::use_awaitable_t<>::as_default_on_t<net::posix::stream_descriptor>;
using connection = aedis::connection<tcp_socket>;
using response_type = std::vector<aedis::resp3::node<std::string>>;
// Chat over redis pubsub. To test, run this program from different
// terminals and type messages to stdin. You may also want to run
// terminals and type messages to stdin. Use
//
// $ redis-cli
// > monitor
// $ redis-cli monitor
//
// To see the message traffic.
// to monitor the message traffic.
net::awaitable<void> subscriber(std::shared_ptr<connection> db)
// Receives messages from other users.
net::awaitable<void> push_receiver(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "chat-channel");
for (response_type resp;;) {
auto const ev = co_await db->async_receive_event(aedis::adapt(resp));
switch (ev) {
case connection::event::push:
print_push(resp);
break;
case connection::event::hello:
co_await db->async_exec(req);
break;
default:;
}
for (std::vector<node<std::string>> resp;;) {
co_await db->async_receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
}
net::awaitable<void>
publisher(net::posix::stream_descriptor& in, std::shared_ptr<connection> db)
// Subscribes to the channels when a new connection is stablished.
net::awaitable<void> event_receiver(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "chat-channel");
for (;;) {
auto ev = co_await db->async_receive_event();
if (ev == connection::event::hello)
co_await db->async_exec(req);
}
}
// Publishes messages to other users.
net::awaitable<void> publisher(stream_descriptor& in, std::shared_ptr<connection> db)
{
for (std::string msg;;) {
std::size_t n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n", net::use_awaitable);
auto n = co_await net::async_read_until(in, net::dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "chat-channel", msg);
co_await db->async_exec(req);
@@ -69,16 +71,16 @@ int main()
{
try {
net::io_context ioc{1};
net::posix::stream_descriptor in{ioc, ::dup(STDIN_FILENO)};
stream_descriptor in{ioc, ::dup(STDIN_FILENO)};
auto db = std::make_shared<connection>(ioc);
db->get_config().enable_events = true;
db->get_config().enable_reconnect = true;
co_spawn(ioc, publisher(in, db), net::detached);
co_spawn(ioc, subscriber(db), net::detached);
db->async_run([](auto ec) {
std::cout << ec.message() << std::endl;
});
co_spawn(ioc, push_receiver(db), net::detached);
co_spawn(ioc, event_receiver(db), net::detached);
db->async_run(net::detached);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });

View File

@@ -28,11 +28,12 @@ int main()
{{"key1", 10}, {"key2", 20}, {"key3", 30}};
request req;
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
req.push_range("RPUSH", "rpush-key", vec); // Sends
req.push_range("HSET", "hset-key", map); // Sends
req.push("MULTI");
req.push("LRANGE", "rpush-key", 0, -1);
req.push("HGETALL", "hset-key");
req.push("LRANGE", "rpush-key", 0, -1); // Retrieves
req.push("HGETALL", "hset-key"); // Retrieves
req.push("EXEC");
req.push("QUIT");
@@ -48,7 +49,7 @@ int main()
net::io_context ioc;
connection db{ioc};
db.async_run(req, aedis::adapt(resp), [](auto ec, auto) {
db.async_run(req, adapt(resp), [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});
ioc.run();

View File

@@ -25,22 +25,17 @@ using connection = aedis::connection<tcp_socket>;
awaitable_type echo_loop(tcp_socket socket, std::shared_ptr<connection> db)
{
try {
request req;
std::tuple<std::string> resp;
std::string buffer;
request req;
std::tuple<std::string> resp;
for (;;) {
auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n");
req.push("PING", buffer);
co_await db->async_exec(req, adapt(resp));
co_await net::async_write(socket, net::buffer(std::get<0>(resp)));
std::get<0>(resp).clear();
req.clear();
buffer.erase(0, n);
}
} catch (std::exception const& e) {
std::cout << e.what() << std::endl;
for (std::string buffer;;) {
auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n");
req.push("PING", buffer);
co_await db->async_exec(req, adapt(resp));
co_await net::async_write(socket, net::buffer(std::get<0>(resp)));
std::get<0>(resp).clear();
req.clear();
buffer.erase(0, n);
}
}

View File

@@ -6,9 +6,9 @@
#include <tuple>
#include <string>
#include <boost/asio.hpp>
#include <thread>
#include <boost/asio/io_context.hpp>
#include <aedis.hpp>
#include <aedis/experimental/sync.hpp>
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
@@ -16,29 +16,30 @@
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using aedis::experimental::exec;
using connection = aedis::connection<>;
using connection = aedis::sync<aedis::connection<>>;
int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
auto work = net::make_work_guard(ioc);
std::thread t1{[&]() { ioc.run(); }};
std::thread thread{[&]() {
conn.async_run(net::detached);
ioc.run();
}};
connection conn{work.get_executor()};
std::thread t2{[&]() { boost::system::error_code ec; conn.run(ec); }};
request req;
req.push("PING");
req.push("QUIT");
std::tuple<std::string, aedis::ignore> resp;
exec(conn, req, adapt(resp));
thread.join();
conn.exec(req, adapt(resp));
std::cout << "Response: " << std::get<0>(resp) << std::endl;
work.reset();
t1.join();
t2.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}

View File

@@ -17,8 +17,6 @@
namespace net = boost::asio;
using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::adapter::adapter_t;
void print_aggr(std::vector<aedis::resp3::node<std::string>>& v)
{

View File

@@ -20,6 +20,7 @@
namespace net = boost::asio;
using aedis::resp3::request;
using aedis::adapt;
using connection = aedis::connection<>;
using namespace boost::json;
@@ -86,21 +87,18 @@ int main()
net::io_context ioc;
connection db{ioc};
// Request that sends the containers.
std::set<user> users
{ {"Joao", "56", "Brazil"}
, {"Serge", "60", "France"}
};
{{"Joao", "58", "Brazil"} , {"Serge", "60", "France"}};
request req;
req.push("HELLO", 3);
req.push_range("SADD", "sadd-key", users);
req.push("SMEMBERS", "sadd-key");
req.push_range("SADD", "sadd-key", users); // Sends
req.push("SMEMBERS", "sadd-key"); // Retrieves
req.push("QUIT");
std::tuple<aedis::ignore, int, std::set<user>, std::string> resp;
db.async_run(req, aedis::adapt(resp), [](auto ec, auto) {
db.async_run(req, adapt(resp), [](auto ec, auto) {
std::cout << ec.message() << std::endl;
});

View File

@@ -15,11 +15,10 @@
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using node_type = aedis::resp3::node<std::string>;
using aedis::resp3::node;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using connection = aedis::connection<tcp_socket>;
@@ -28,7 +27,7 @@ using connection = aedis::connection<tcp_socket>;
* To test send messages with redis-cli
*
* $ redis-cli -3
* 127.0.0.1:6379> PUBLISH channel1 some-message
* 127.0.0.1:6379> PUBLISH channel some-message
* (integer) 3
* 127.0.0.1:6379>
*
@@ -39,30 +38,26 @@ using connection = aedis::connection<tcp_socket>;
* > CLIENT kill TYPE pubsub
*/
net::awaitable<void> receiver(std::shared_ptr<connection> db)
// Receives pushes.
net::awaitable<void> push_receiver(std::shared_ptr<connection> db)
{
for (std::vector<node<std::string>> resp;;) {
co_await db->async_receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
}
// Receives events.
net::awaitable<void> event_receiver(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "channel");
for (std::vector<node_type> resp;;) {
auto const ev = co_await db->async_receive_event(aedis::adapt(resp));
std::cout << "Event: " << aedis::to_string<tcp_socket>(ev) << std::endl;
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;
case connection::event::hello:
// Subscribes to the channels when a new connection is
// stablished.
for (;;) {
auto ev = co_await db->async_receive_event();
if (ev == connection::event::hello)
co_await db->async_exec(req);
break;
default:;
}
}
}
@@ -75,10 +70,13 @@ int main()
db->get_config().enable_events = true;
db->get_config().enable_reconnect = true;
net::co_spawn(ioc, receiver(db), net::detached);
net::co_spawn(ioc, push_receiver(db), net::detached);
net::co_spawn(ioc, event_receiver(db), net::detached);
db->async_run(net::detached);
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });
ioc.run();
} catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;

View File

@@ -6,45 +6,33 @@
#include <tuple>
#include <string>
#include <thread>
#include <boost/asio.hpp>
#include <aedis.hpp>
#include <aedis/experimental/sync.hpp>
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::resp3::request;
using aedis::experimental::exec;
using aedis::experimental::receive_event;
using connection = aedis::connection<>;
using aedis::adapt;
using aedis::resp3::node;
using aedis::resp3::request;
using connection = aedis::sync<aedis::connection<>>;
using event = connection::event;
// See subscriber.cpp for more info about how to run this example.
void subscriber(connection& conn)
// Subscribe again everytime there is a disconnection.
void event_receiver(connection& conn)
{
request req;
req.push("SUBSCRIBE", "channel");
for (std::vector<node<std::string>> resp;;) {
auto const ev = receive_event(conn, aedis::adapt(resp));
switch (ev) {
case connection::event::push:
print_push(resp);
resp.clear();
break;
case connection::event::hello:
// Subscribes to the channels when a new connection is
// stablished.
exec(conn, req);
break;
default:;
}
for (;;) {
auto ev = conn.receive_event();
if (ev == connection::event::hello)
conn.exec(req);
}
}
@@ -52,18 +40,26 @@ int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
auto work = net::make_work_guard(ioc);
conn.get_config().enable_events = true;
conn.get_config().enable_reconnect = true;
connection::config cfg;
cfg.enable_events = true;
cfg.enable_reconnect = true;
connection conn{work.get_executor(), cfg};
std::thread thread{[&]() {
conn.async_run(net::detached);
ioc.run();
}};
std::thread t1{[&]() { ioc.run(); }};
std::thread t2{[&]() { boost::system::error_code ec; conn.run(ec); }};
std::thread t3{[&]() { event_receiver(conn); }};
subscriber(conn);
thread.join();
for (std::vector<node<std::string>> resp;;) {
conn.receive_push(adapt(resp));
print_push(resp);
resp.clear();
}
t1.join();
t2.join();
t3.join();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -6,14 +6,13 @@ nobase_include_HEADERS =\
$(top_srcdir)/include/aedis/detail/net.hpp\
$(top_srcdir)/include/aedis/connection.hpp\
$(top_srcdir)/include/aedis/adapt.hpp\
$(top_srcdir)/include/aedis/sync.hpp\
$(top_srcdir)/include/aedis/detail/connection_ops.hpp\
$(top_srcdir)/include/aedis.hpp\
$(top_srcdir)/include/aedis/experimental/sync.hpp\
$(top_srcdir)/include/aedis/adapter/detail/adapters.hpp\
$(top_srcdir)/include/aedis/adapter/adapt.hpp\
$(top_srcdir)/include/aedis/adapter/detail/response_traits.hpp\
$(top_srcdir)/include/aedis/resp3/node.hpp\
$(top_srcdir)/include/aedis/resp3/compose.hpp\
$(top_srcdir)/include/aedis/resp3/detail/read_ops.hpp\
$(top_srcdir)/include/aedis/resp3/detail/parser.hpp\
$(top_srcdir)/include/aedis/resp3/detail/exec.hpp\

View File

@@ -10,6 +10,7 @@
#include <aedis/error.hpp>
#include <aedis/adapt.hpp>
#include <aedis/connection.hpp>
#include <aedis/sync.hpp>
#include <aedis/resp3/request.hpp>
/** \mainpage Documentation
@@ -21,8 +22,8 @@
Aedis is a high-level [Redis](https://redis.io/) client library
built on top of
[Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html),
some of its distinctive features are
[Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html).
Some of its distinctive features are
\li Support for the latest version of the Redis communication protocol [RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md).
\li First class support for STL containers and C++ built-in types.
@@ -30,11 +31,11 @@
\li Healthy checks, back pressure and low latency.
\li Hides most of the low level asynchronous operations away from the user.
Let us start with an overview of asynchronous code.
Let us have a look a some code snippets
@subsection Async
The code below sends a ping command to Redis (see intro.cpp)
The code below sends a ping command to Redis and quits (see intro.cpp)
@code
int main()
@@ -55,9 +56,9 @@
}
@endcode
The connection class maintains a healthy connection with
Redis over which users can execute their commands, without any
need of queuing. For example, to execute more than one command
The connection class maintains a healthy connection with Redis
over which users can execute their commands, without any need of
queuing. For example, to execute more than one request
@code
int main()
@@ -77,7 +78,7 @@
}
@endcode
The `async_exec` functions above can be called from different
The `connection::async_exec` functions above can be called from different
places in the code without knowing about each other, see for
example echo_server.cpp. Server-side pushes are supported on the
same connection where commands are executed, a typical subscriber
@@ -87,61 +88,42 @@
@code
net::awaitable<void> reader(std::shared_ptr<connection> db)
{
request req;
req.push("SUBSCRIBE", "channel");
for (std::vector<node_type> resp;;) {
auto ev = co_await db->async_receive_event(aedis::adapt(resp));
switch (ev) {
case connection::event::push:
// Use resp.
resp.clear();
break;
case connection::event::hello:
// Subscribes to channels when a new connection is
// stablished.
co_await db->async_exec(req);
break;
default:;
}
co_await db->async_receive_event(adapt(resp));
// Use resp and clear it.
resp.clear();
}
}
@endcode
@subsection Sync
The `connection` class is async-only, many users however need to
interact with it synchronously, this is also supported by Aedis as long
as this interaction occurs across threads, for example (see
intro_sync.cpp)
The `connection` class offers only an asynchronous API.
Synchronous communications with redis is provided by the `aedis::sync`
wrapper class. (see intro_sync.cpp)
@code
int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
std::thread thread{[&]() {
conn.async_run(net::detached);
ioc.run();
}};
request req;
req.push("PING");
req.push("QUIT");
std::tuple<std::string, aedis::ignore> resp;
exec(conn, req, adapt(resp));
thread.join();
std::cout << "Response: " << std::get<0>(resp) << std::endl;
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
net::io_context ioc{1};
auto work = net::make_work_guard(ioc);
std::thread t1{[&]() { ioc.run(); }};
sync<connection> conn{work.get_executor()};
std::thread t2{[&]() { boost::system::error_code ec; conn.run(ec); }};
request req;
req.push("PING");
req.push("QUIT");
std::tuple<std::string, aedis::ignore> resp;
conn.exec(req, adapt(resp));
std::cout << "Response: " << std::get<0>(resp) << std::endl;
work.reset();
t1.join();
t2.join();
}
@endcode
@@ -156,23 +138,27 @@
For a simple installation run
```
# Clone the repository and checkout the lastest release tag.
$ git clone --branch v0.3.0 https://github.com/mzimbres/aedis.git
$ git clone --branch v1.0.0 https://github.com/mzimbres/aedis.git
$ cd aedis
# Build an example
# Option 1: Direct compilation.
$ g++ -std=c++17 -pthread examples/intro.cpp -I./include -I/path/boost_1_79_0/include/
# Option 2: Use cmake.
$ BOOST_ROOT=/opt/boost_1_79_0/ cmake -DCMAKE_CXX_FLAGS=-std=c++20 .
```
@note CMake support is still experimental.
For a proper full installation on the system run
```
# Download and unpack the latest release
$ wget https://github.com/mzimbres/aedis/releases/download/v0.3.0/aedis-0.2.1.tar.gz
$ tar -xzvf aedis-0.2.1.tar.gz
$ wget https://github.com/mzimbres/aedis/releases/download/v1.0.0/aedis-1.0.0.tar.gz
$ tar -xzvf aedis-1.0.0.tar.gz
# Configure, build and install
$ ./configure --prefix=/opt/aedis-0.2.1 --with-boost=/opt/boost_1_78_0
$ CXXFLAGS="-std=c++17" ./configure --prefix=/opt/aedis-1.0.0 --with-boost=/opt/boost_1_78_0
$ sudo make install
```
@@ -209,12 +195,15 @@
$ autoreconf -i
```
After that you will have a configure script that you can run as
explained above, for example, to use a compiler other that the
system compiler run
After that we get a configure script that can be run as explained
above, for example, to build with a compiler other that the system
compiler with coverage support run
```
$ CXX=clang++-14 CXXFLAGS="-g" ./configure --with-boost=...
$ CXX=clang++-14 \
CXXFLAGS="-g -std=c++20 -Wall -Wextra --coverage -fkeep-inline-functions -fkeep-static-functions" \
LDFLAGS="--coverage" \
./configure --with-boost=/opt/boost_1_79_0
```
To generate release tarballs run
@@ -223,7 +212,6 @@
$ make distcheck
```
\section requests Requests
Redis requests are composed of one of more Redis commands (in
@@ -377,7 +365,7 @@
To read the response to transactions we have to observe that Redis
queues the commands as they arrive and sends the responses back to
the user in a single array, in the response to the @c exec command.
the user as an array, in the response to the @c exec command.
For example, to read the response to the this request
@code
@@ -394,7 +382,7 @@
using aedis::ignore;
using boost::optional;
using tresp_type =
using exec_resp_type =
std::tuple<
optional<std::string>, // get
optional<std::vector<std::string>>, // lrange
@@ -406,7 +394,7 @@
ignore, // get
ignore, // lrange
ignore, // hgetall
tresp_type, // exec
exec_resp_type, // exec
> resp;
co_await db->async_exec(req, adapt(resp));
@@ -440,7 +428,7 @@
There are cases where responses to Redis
commands won't fit in the model presented above, some examples are
@li Commands (like \c set) whose response don't have a fixed
@li Commands (like \c set) whose responses don't have a fixed
RESP3 type. Expecting an \c int and receiving a blob-string
will result in error.
@li RESP3 aggregates that contain nested aggregates can't be read in STL containers.
@@ -484,14 +472,14 @@
@endcode
For example, suppose we want to retrieve a hash data structure
from Redis with \c hgetall, some of the options are
from Redis with `HGETALL`, some of the options are
@li \c std::vector<node<std::string>: Works always.
@li \c std::vector<std::string>: Efficient and flat, all elements as string.
@li \c std::map<std::string, std::string>: Efficient if you need the data as a \c std::map
@li \c std::map<U, V>: Efficient if you are storing serialized data. Avoids temporaries and requires \c from_bulk for \c U and \c V.
In addition to the above users can also use unordered versions of the containers. The same reasoning also applies to sets e.g. \c smembers.
In addition to the above users can also use unordered versions of the containers. The same reasoning also applies to sets e.g. `SMEMBERS`.
\section examples Examples
@@ -500,8 +488,8 @@
@li intro.cpp: Basic steps with Aedis.
@li intro_sync.cpp: Synchronous version of intro.cpp.
@li containers.cpp: Shows how to send and receive stl containers.
@li serialization.cpp: Shows the \c request support to serialization of user types.
@li subscriber.cpp: Shows how to subscribe to a channel and how to reconnect when connection is lost.
@li serialization.cpp: Shows how to serialize your own types.
@li subscriber.cpp: Shows how to use pubsub.
@li subscriber_sync.cpp: Synchronous version of subscriber.cpp.
@li echo_server.cpp: A simple TCP echo server that uses coroutines.
@li chat_room.cpp: A simple chat room that uses coroutines.

View File

@@ -20,12 +20,13 @@
namespace aedis {
/** @brief A type that ignores responses.
/** @brief Tag used to ignore responses.
* @ingroup any
*
* For example
*
* @code
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
* std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
* @endcode
*
* will cause only the second tuple type to be parsed, the others
@@ -84,7 +85,7 @@ private:
adapter_type adapter_;
public:
vector_adapter(Vector& v) : adapter_{adapter::adapt(v)} { }
vector_adapter(Vector& v) : adapter_{adapter::adapt2(v)} { }
auto supported_response_size() const noexcept { return std::size_t(-1);}
@@ -131,9 +132,10 @@ struct response_traits<std::tuple<Ts...>> {
} // detail
/** @brief Creates an adapter that ignores responses.
* @ingroup any
*
* This function can be used to create adapters that ignores
* responses. As a result it can improve performance.
* responses.
*/
auto adapt() noexcept
{
@@ -141,6 +143,7 @@ auto adapt() noexcept
}
/** @brief Adapts a type to be used as a response.
* @ingroup any
*
* The type T can be any STL container, any integer type and
* \c std::string

View File

@@ -39,7 +39,7 @@ using adapter_t = typename detail::adapter_t<T>;
@endcode
*/
inline
auto adapt() noexcept
auto adapt2() noexcept
{ return detail::response_traits<void>::adapt(); }
/** \internal
@@ -75,7 +75,7 @@ auto adapt() noexcept
* @endcode
*/
template<class T>
auto adapt(T& t) noexcept
auto adapt2(T& t) noexcept
{ return detail::response_traits<T>::adapt(t); }
} // adapter

View File

@@ -146,7 +146,7 @@ public:
return;
if (is_aggregate(n.data_type)) {
ec = error::expects_simple_type;
ec = error::expects_resp3_simple_type;
return;
}
@@ -175,14 +175,14 @@ public:
if (is_aggregate(nd.data_type)) {
if (nd.data_type != resp3::type::set)
ec = error::expects_set_type;
ec = error::expects_resp3_set;
return;
}
BOOST_ASSERT(nd.aggregate_size == 1);
if (nd.depth < 1) {
ec = error::expects_set_type;
ec = error::expects_resp3_set;
return;
}
@@ -214,14 +214,14 @@ public:
if (is_aggregate(nd.data_type)) {
if (element_multiplicity(nd.data_type) != 2)
ec = error::expects_map_type;
ec = error::expects_resp3_map;
return;
}
BOOST_ASSERT(nd.aggregate_size == 1);
if (nd.depth < 1) {
ec = error::expects_map_type;
ec = error::expects_resp3_map;
return;
}
@@ -294,7 +294,7 @@ public:
}
} else {
if (i_ == -1) {
ec = error::expects_aggregate_type;
ec = error::expects_resp3_aggregate;
return;
}
@@ -324,7 +324,7 @@ struct list_impl {
if (!is_aggregate(nd.data_type)) {
BOOST_ASSERT(nd.aggregate_size == 1);
if (nd.depth < 1) {
ec = error::expects_aggregate_type;
ec = error::expects_resp3_aggregate;
return;
}

View File

@@ -17,6 +17,7 @@
#include <boost/assert.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <aedis/adapt.hpp>
@@ -25,13 +26,20 @@
namespace aedis {
/** @brief A high level Redis connection class.
/** @brief A high level connection to Redis.
* @ingroup any
*
* This class keeps a healthy connection to the Redis instance where
* commands can be sent at any time. For more details, please see the
* documentation of each individual function.
*
* @remarks This class exposes only asynchronous member functions,
* synchronous communications with the Redis server is provided by
* the sync class.
*
* @tparam AsyncReadWriteStream A stream that supports
* `async_read_some` and `async_write_some`.
*
*/
template <class AsyncReadWriteStream = boost::asio::ip::tcp::socket>
class connection {
@@ -42,20 +50,13 @@ public:
/// Type of the next layer
using next_layer_type = AsyncReadWriteStream;
using default_completion_token_type = boost::asio::default_completion_token_t<executor_type>;
using channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
using clock_type = std::chrono::steady_clock;
using clock_traits_type = boost::asio::wait_traits<clock_type>;
using timer_type = boost::asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using resolver_type = boost::asio::ip::basic_resolver<boost::asio::ip::tcp, executor_type>;
/** @brief Connection configuration parameters.
*/
struct config {
/// The Redis server address.
/// Redis server address.
std::string host = "127.0.0.1";
/// The Redis server port.
/// Redis server port.
std::string port = "6379";
/// Username if authentication is required.
@@ -70,40 +71,54 @@ public:
/// Timeout of the connect operation.
std::chrono::milliseconds connect_timeout = std::chrono::seconds{10};
/// Time interval ping operations.
/// Time interval of ping operations.
std::chrono::milliseconds ping_interval = std::chrono::seconds{1};
/// Time waited before trying a reconnection (see enable reconnect).
/// Time waited before trying a reconnection (see config::enable_reconnect).
std::chrono::milliseconds reconnect_interval = std::chrono::seconds{1};
/// The maximum size allowed on read operations.
/// The maximum size of read operations.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
/// Whether to coalesce requests (see [pipelines](https://redis.io/topics/pipelining)).
bool coalesce_requests = true;
/// Enable events
/// Enable internal events, see connection::async_receive_event.
bool enable_events = false;
/// Enable automatic reconnection (see also reconnect_interval).
/// Enable automatic reconnection (see also config::reconnect_interval).
bool enable_reconnect = false;
};
/// Events communicated through \c async_receive_event.
/// Events that are communicated by `connection::async_receive_event`.
enum class event {
/// The address has been successfully resolved.
/// Resolve operation was successful.
resolve,
/// Connected to the Redis server.
/// Connect operation was successful.
connect,
/// Success sending AUTH and HELLO.
hello,
/// A push event has been received.
push,
/// Used internally.
invalid
};
/** \brief Construct a connection from an executor.
/** @brief Async operations exposed by this class.
*
* The operations listed below can be cancelled with the `cancel`
* member function.
*/
enum class operation {
/// `connection::async_exec` operations.
exec,
/// `connection::async_run` operations.
run,
/// `connection::async_receive_event` operations.
receive_event,
/// `connection::async_receive_push` operations.
receive_push,
};
/** \brief Contructor
*
* \param ex The executor.
* \param cfg Configuration parameters.
@@ -115,6 +130,7 @@ public:
, writer_timer_{ex}
, read_timer_{ex}
, push_channel_{ex}
, event_channel_{ex}
, cfg_{cfg}
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
{
@@ -122,7 +138,7 @@ public:
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
/** \brief Construct a connection from an io_context.
/** \brief Constructor
*
* \param ioc The executor.
* \param cfg Configuration parameters.
@@ -134,29 +150,114 @@ public:
/// Returns the executor.
auto get_executor() {return resv_.get_executor();}
/** @brief Cancel operations.
*
* @li `operation::exec`: Cancels operations started with `async_exec`.
*
* @li operation::run: Cancels `async_run`. Notice that the
* preferred way to close a connection is to ensure
* `config::enable_reconnect` is set to `false` and send `QUIT`
* to the server. An unresponsive Redis server will also cause
* the idle-checks to kick in and lead to
* `connection::async_run` completing with
* `error::idle_timeout`. Calling `cancel(operation::run)`
* directly should be seen as the last option.
*
* @li operation::receive_event: Cancels `connection::async_receive_event`.
*
* @param op: The operation to be cancelled.
* @returns The number of operations that have been canceled.
*/
std::size_t cancel(operation op)
{
switch (op) {
case operation::exec:
{
for (auto& e: reqs_) {
e->stop = true;
e->timer.cancel_one();
}
auto const ret = reqs_.size();
reqs_ = {};
return ret;
}
case operation::run:
{
if (socket_)
socket_->close();
read_timer_.cancel();
check_idle_timer_.cancel();
writer_timer_.cancel();
ping_timer_.cancel();
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !ptr->req->close_on_run_completion;
});
// Cancel own pings if there are any waiting.
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->stop = true;
ptr->timer.cancel();
});
reqs_.erase(point, std::end(reqs_));
return 1U;
}
case operation::receive_event:
{
event_channel_.cancel();
return 1U;
}
case operation::receive_push:
{
push_channel_.cancel();
return 1U;
}
}
return 0;
}
/// Get the config object.
config& get_config() noexcept { return cfg_;}
/// Gets the config object.
config const& get_config() const noexcept { return cfg_;}
/** @name Asynchronous functions
*
* Each of these operations a individually cancellable.
**/
/// @{
/** @brief Starts communication with the Redis server asynchronously.
*
* This function performs the following steps
*
* \li Resolves the Redis host as of \c async_resolve with the
* timeout passed in connection::config::resolve_timeout.
* @li Resolves the Redis host as of `async_resolve` with the
* timeout passed in `config::resolve_timeout`.
*
* \li Connects to one of the endpoints returned by the resolve
* operation with the timeout passed in connection::config::connect_timeout.
* @li Connects to one of the endpoints returned by the resolve
* operation with the timeout passed in `config::connect_timeout`.
*
* \li Starts the idle check operation with the timeout of twice
* the value of connection::config::ping_interval. If no data is
* received during that time interval \c async_run completes with
* error::idle_timeout.
* @li Starts healthy checks with a timeout twice
* the value of `config::ping_interval`. If no data is
* received during that time interval `connection::async_run` completes with
* `error::idle_timeout`.
*
* \li Starts the healthy check operation that sends command::ping
* to Redis with a frequency equal to
* connection::config::ping_interval.
* @li Starts the healthy check operation that sends `PING`s to
* Redis with a frequency equal to `config::ping_interval`.
*
* \li Starts reading from the socket and delivering events to the
* request started with \c async_exec and \c async_receive_event.
* @li Starts reading from the socket and executes all requests
* that have been started prior to this function call.
*
* For an example see echo_server.cpp.
* @remark When a timeout occur and config::enable_reconnect is
* set, this function will automatically try a reconnection
* without returning control to the user.
*
* For an example see echo_server.cpp.
*
* \param token Completion token.
*
@@ -165,10 +266,8 @@ public:
* @code
* void f(boost::system::error_code);
* @endcode
*
* \return This function returns only when there is an error.
*/
template <class CompletionToken = default_completion_token_type>
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_run(CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
@@ -177,7 +276,44 @@ public:
>(detail::run_op<connection>{this}, token, resv_);
}
/** @brief Connects and executes a request asynchronously.
*
* Combines the other `async_run` overload with `async_exec` in a
* single function. This function is useful for users that want to
* send a single request to the server and close it.
*
* \param req Request object.
* \param adapter Response adapter.
* \param token Asio completion token.
*
* For an example see intro.cpp. The completion token must have
* the following signature
*
* @code
* void f(boost::system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the response in bytes.
*/
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_run(
resp3::request const& req,
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::runexec_op<connection, Adapter>
{this, &req, adapter}, token, resv_);
}
/** @brief Executes a command on the redis server asynchronously.
*
* There is no need to synchronize multiple calls to this
* function as it keeps an internal queue.
*
* \param req Request object.
* \param adapter Response adapter.
@@ -195,7 +331,7 @@ public:
*/
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_exec(
resp3::request const& req,
Adapter adapter = adapt(),
@@ -209,44 +345,11 @@ public:
>(detail::exec_op<connection, Adapter>{this, &req, adapter}, token, resv_);
}
/** @brief Connects and executes a request asynchronously.
/** @brief Receives server side pushes asynchronously.
*
* Combines \c async_run and the other \c async_exec overload in a
* single function. This function is useful for users that want to
* send a single request to the server and close it.
*
* \param req Request object.
* \param adapter Response adapter.
* \param token Asio completion token.
*
* For an example see intro.cpp. The completion token must have the following signature
*
* @code
* void f(boost::system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the response in bytes.
*/
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto async_run(
resp3::request const& req,
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::runexec_op<connection, Adapter>
{this, &req, adapter}, token, resv_);
}
/** @brief Receives unsolicited events asynchronously.
*
* Users that expect unsolicited events should call this function
* in a loop. If an unsolicited events comes in and there is no
* reader, the connection will hang and eventually timeout.
* Users that expect server pushes have to call this function in a
* loop. If an unsolicited event comes in and there is no reader,
* the connection will hang and eventually timeout.
*
* \param adapter The response adapter.
* \param token The Asio completion token.
@@ -255,16 +358,16 @@ public:
* have the following signature
*
* @code
* void f(boost::system::error_code, event);
* void f(boost::system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the response that has
* just been read.
* Where the second parameter is the size of the push in
* bytes.
*/
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto async_receive_event(
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_receive_push(
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
{
@@ -277,79 +380,38 @@ public:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, event)
>(detail::receive_op<connection, decltype(f)>{this, f}, token, resv_);
, void(boost::system::error_code, std::size_t)
>(detail::receive_push_op<connection, decltype(f)>{this, f}, token, resv_);
}
/** @brief Cancel all pending request.
/** @brief Receives internal events.
*
* \returns The number of requests that have been canceled.
* See enum \c events for the list of events.
*
* \param token The Asio completion token.
*
* The completion token must have the following signature
*
* @code
* void f(boost::system::error_code, event);
* @endcode
*/
std::size_t cancel_execs()
template <class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
auto async_receive_event(CompletionToken token = CompletionToken{})
{
for (auto& e: reqs_) {
e->stop = true;
e->timer.cancel_one();
}
auto const ret = reqs_.size();
reqs_ = {};
return ret;
return event_channel_.async_receive(token);
}
/** @brief Closes the connection with the database.
*
* Calling this function will cause \c async_run to return. It is
* safe to try a reconnect after that, when that happens, all
* pending request will be automatically sent.
*
* Calling this function will causes @c async_receive_event to return
* with @c boost::asio::experimental::channel_errc::channel_cancelled.
*
* \remarks
*
* The prefered way to close a connection is to send a \c quit
* command if you are actively closing it. Otherwise an
* unresponsive Redis server will cause the idle-checks to kick in
* and lead to \c async_run returning with idle_timeout.
*
*/
void cancel_run()
{
if (socket_)
socket_->close();
read_timer_.cancel();
check_idle_timer_.cancel();
writer_timer_.cancel();
ping_timer_.cancel();
// Cancel own pings if there is any waiting.
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !ptr->req->close_on_run_completion;
});
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->stop = true;
ptr->timer.cancel();
});
reqs_.erase(point, std::end(reqs_));
}
/// Cancels the event receiver.
void cancel_event_receiver()
{
push_channel_.cancel();
}
/// Get the config object.
config& get_config() noexcept { return cfg_;}
/// Gets the config object.
config const& get_config() const noexcept { return cfg_;}
/// @}
private:
using clock_type = std::chrono::steady_clock;
using clock_traits_type = boost::asio::wait_traits<clock_type>;
using timer_type = boost::asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using resolver_type = boost::asio::ip::basic_resolver<boost::asio::ip::tcp, executor_type>;
using push_channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, std::size_t)>;
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
using event_channel_type = boost::asio::experimental::channel<executor_type, void(boost::system::error_code, event)>;
struct req_info {
req_info(executor_type ex) : timer{ex} {}
timer_type timer;
@@ -359,10 +421,9 @@ private:
bool written = false;
};
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
using reqs_type = std::deque<std::shared_ptr<req_info>>;
template <class T, class U> friend struct detail::receive_op;
template <class T, class U> friend struct detail::receive_push_op;
template <class T> friend struct detail::reader_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::ping_op;
@@ -377,7 +438,7 @@ private:
template <class T> friend struct detail::start_op;
template <class T> friend struct detail::send_receive_op;
template <class CompletionToken = default_completion_token_type>
template <class CompletionToken>
auto async_run_one(CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
@@ -505,14 +566,14 @@ private:
timer_type check_idle_timer_;
timer_type writer_timer_;
timer_type read_timer_;
channel_type push_channel_;
push_channel_type push_channel_;
event_channel_type event_channel_;
config cfg_;
std::string read_buffer_;
std::string write_buffer_;
std::size_t cmds_ = 0;
reqs_type reqs_;
event last_event_ = event::invalid;
// Last time we received data.
time_point_type last_data_;
@@ -523,7 +584,9 @@ private:
resp3::request req_;
};
/// Converts a connection event to a string.
/** @brief Converts a connection event to a string.
* @relates connection
*/
template <class T>
char const* to_string(typename connection<T>::event e)
{
@@ -538,7 +601,9 @@ char const* to_string(typename connection<T>::event e)
}
}
/// Writes a connection event to the stream.
/** @brief Writes a connection event to the stream.
* @relates connection
*/
template <class T>
std::ostream& operator<<(std::ostream& os, typename connection<T>::event e)
{

View File

@@ -48,7 +48,8 @@ struct connect_with_timeout_op {
{
BOOST_ASSERT(conn->socket_ != nullptr);
conn->ping_timer_.expires_after(conn->cfg_.connect_timeout);
yield aedis::detail::async_connect(*conn->socket_, conn->ping_timer_, conn->endpoints_, std::move(self));
yield
aedis::detail::async_connect(*conn->socket_, conn->ping_timer_, conn->endpoints_, std::move(self));
self.complete(ec);
}
}
@@ -78,7 +79,7 @@ struct resolve_with_timeout_op {
};
template <class Conn, class Adapter>
struct receive_op {
struct receive_push_op {
Conn* conn = nullptr;
Adapter adapter;
std::size_t read_size = 0;
@@ -92,26 +93,27 @@ struct receive_op {
{
reenter (coro)
{
yield conn->push_channel_.async_receive(std::move(self));
yield
conn->push_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec, Conn::event::invalid);
self.complete(ec, 0);
return;
}
if (conn->last_event_ == Conn::event::push) {
BOOST_ASSERT(conn->socket_ != nullptr);
yield resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self));
if (ec) {
conn->cancel_run();
self.complete(ec, Conn::event::invalid);
return;
}
read_size = n;
BOOST_ASSERT(conn->socket_ != nullptr);
yield
resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self));
if (ec) {
conn->cancel(Conn::operation::run);
self.complete(ec, 0);
return;
}
yield conn->push_channel_.async_send({}, 0, std::move(self));
self.complete(ec, conn->last_event_);
read_size = n;
yield
conn->push_channel_.async_send({}, 0, std::move(self));
self.complete(ec, read_size);
return;
}
}
@@ -145,22 +147,23 @@ struct exec_read_op {
// some data in the read bufer.
if (conn->read_buffer_.empty()) {
BOOST_ASSERT(conn->socket_ != nullptr);
yield boost::asio::async_read_until(*conn->socket_, conn->make_dynamic_buffer(), "\r\n", std::move(self));
yield
boost::asio::async_read_until(*conn->socket_, conn->make_dynamic_buffer(), "\r\n", std::move(self));
if (ec) {
conn->cancel_run();
conn->cancel(Conn::operation::run);
self.complete(ec, 0);
return;
}
}
// If the next request is a push we have to handle it to
// the receive_op wait for it to be done and continue.
// the receive_push_op wait for it to be done and continue.
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push) {
conn->last_event_ = Conn::event::push;
yield async_send_receive(conn->push_channel_, std::move(self));
yield
async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
// Notice we don't call cancel_run() as that is the
// responsability of the receive_op.
// responsability of the receive_push_op.
self.complete(ec, 0);
return;
}
@@ -177,7 +180,7 @@ struct exec_read_op {
++index;
if (ec) {
conn->cancel_run();
conn->cancel(Conn::operation::run);
self.complete(ec, 0);
return;
}
@@ -222,7 +225,8 @@ struct exec_op {
info->stop = false;
conn->add_request_info(info);
yield info->timer.async_wait(std::move(self));
yield
info->timer.async_wait(std::move(self));
BOOST_ASSERT(conn->socket_ != nullptr);
BOOST_ASSERT(!!ec);
if (info->stop) {
@@ -240,7 +244,8 @@ struct exec_op {
BOOST_ASSERT(!conn->reqs_.empty());
BOOST_ASSERT(conn->reqs_.front() != nullptr);
BOOST_ASSERT(conn->cmds_ != 0);
yield conn->async_exec_read(adapter, conn->reqs_.front()->cmds, std::move(self));
yield
conn->async_exec_read(adapter, conn->reqs_.front()->cmds, std::move(self));
if (ec) {
self.complete(ec, 0);
return;
@@ -279,7 +284,8 @@ struct ping_op {
reenter (coro) for (;;)
{
conn->ping_timer_.expires_after(conn->cfg_.ping_interval);
yield conn->ping_timer_.async_wait(std::move(self));
yield
conn->ping_timer_.async_wait(std::move(self));
BOOST_ASSERT(conn->socket_ != nullptr);
if (ec || !conn->socket_->is_open()) {
self.complete(ec);
@@ -289,7 +295,8 @@ struct ping_op {
conn->req_.clear();
conn->req_.push("PING");
conn->req_.close_on_run_completion = true;
yield conn->async_exec(conn->req_, aedis::adapt(), std::move(self));
yield
conn->async_exec(conn->req_, adapt(), std::move(self));
if (ec) {
// Notice we don't report error but let the idle check
// timeout. It is enough to finish the op.
@@ -311,7 +318,8 @@ struct check_idle_op {
reenter (coro) for (;;)
{
conn->check_idle_timer_.expires_after(2 * conn->cfg_.ping_interval);
yield conn->check_idle_timer_.async_wait(std::move(self));
yield
conn->check_idle_timer_.async_wait(std::move(self));
BOOST_ASSERT(conn->socket_ != nullptr);
if (ec || !conn->socket_->is_open()) {
// Notice this is not an error, it was requested from an
@@ -322,7 +330,7 @@ struct check_idle_op {
auto const now = std::chrono::steady_clock::now();
if (conn->last_data_ + (2 * conn->cfg_.ping_interval) < now) {
conn->cancel_run();
conn->cancel(Conn::operation::run);
self.complete(error::idle_timeout);
return;
}
@@ -358,22 +366,10 @@ struct start_op {
std::move(self));
switch (order[0]) {
case 0:
{
self.complete(ec0);
} break;
case 1:
{
self.complete(ec1);
} break;
case 2:
{
self.complete(ec2);
} break;
case 3:
{
self.complete(ec3);
} break;
case 0: self.complete(ec0); break;
case 1: self.complete(ec1); break;
case 2: self.complete(ec2); break;
case 3: self.complete(ec3); break;
default: BOOST_ASSERT(false);
}
}
@@ -393,16 +389,17 @@ struct run_one_op {
{
reenter (coro)
{
yield conn->async_resolve_with_timeout(std::move(self));
yield
conn->async_resolve_with_timeout(std::move(self));
if (ec) {
conn->cancel_run();
conn->cancel(Conn::operation::run);
self.complete(ec);
return;
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::resolve;
yield async_send_receive(conn->push_channel_, std::move(self));
yield
conn->event_channel_.async_send({}, Conn::event::resolve, std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -411,16 +408,17 @@ struct run_one_op {
conn->socket_ = std::make_shared<typename Conn::next_layer_type>(conn->resv_.get_executor());
yield conn->async_connect_with_timeout(std::move(self));
yield
conn->async_connect_with_timeout(std::move(self));
if (ec) {
conn->cancel_run();
conn->cancel(Conn::operation::run);
self.complete(ec);
return;
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::connect;
yield async_send_receive(conn->push_channel_, std::move(self));
yield
conn->event_channel_.async_send({}, Conn::event::connect, std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -439,31 +437,34 @@ struct run_one_op {
*conn->socket_,
conn->ping_timer_,
conn->req_,
adapter::adapt(),
adapter::adapt2(),
conn->make_dynamic_buffer(),
std::move(self)
);
if (ec) {
conn->cancel_run();
conn->cancel(Conn::operation::run);
self.complete(ec);
return;
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::hello;
yield async_send_receive(conn->push_channel_, std::move(self));
yield
conn->event_channel_.async_send({}, Conn::event::hello, std::move(self));
if (ec) {
self.complete(ec);
return;
}
}
conn->write_buffer_.clear();
conn->cmds_ = 0;
std::for_each(std::begin(conn->reqs_), std::end(conn->reqs_), [](auto const& ptr) {
return ptr->written = false;
});
yield conn->async_start(std::move(self));
yield
conn->async_start(std::move(self));
self.complete(ec);
}
}
@@ -482,7 +483,8 @@ struct run_op {
{
reenter (coro) for(;;)
{
yield conn->async_run_one(std::move(self));
yield
conn->async_run_one(std::move(self));
if (!conn->cfg_.enable_reconnect) {
self.complete(ec);
@@ -493,7 +495,8 @@ struct run_op {
// event here.
conn->ping_timer_.expires_after(conn->cfg_.reconnect_interval);
yield conn->ping_timer_.async_wait(std::move(self));
yield
conn->ping_timer_.async_wait(std::move(self));
}
}
};
@@ -515,7 +518,8 @@ struct writer_op {
{
while (!conn->reqs_.empty() && conn->cmds_ == 0 && conn->write_buffer_.empty()) {
conn->coalesce_requests();
yield boost::asio::async_write(*conn->socket_, boost::asio::buffer(conn->write_buffer_), std::move(self));
yield
boost::asio::async_write(*conn->socket_, boost::asio::buffer(conn->write_buffer_), std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -529,7 +533,8 @@ struct writer_op {
}
if (conn->socket_->is_open()) {
yield conn->writer_timer_.async_wait(std::move(self));
yield
conn->writer_timer_.async_wait(std::move(self));
// The timer may be canceled either to stop the write op
// or to proceed to the next write, the difference between
// the two is that for the former the socket will be
@@ -562,9 +567,10 @@ struct reader_op {
reenter (coro) for (;;)
{
BOOST_ASSERT(conn->socket_->is_open());
yield boost::asio::async_read_until(*conn->socket_, conn->make_dynamic_buffer(), "\r\n", std::move(self));
yield
boost::asio::async_read_until(*conn->socket_, conn->make_dynamic_buffer(), "\r\n", std::move(self));
if (ec) {
conn->cancel_run();
conn->cancel(Conn::operation::run);
self.complete(ec);
return;
}
@@ -592,8 +598,8 @@ struct reader_op {
if (resp3::to_type(conn->read_buffer_.front()) == resp3::type::push
|| conn->reqs_.empty()
|| (!conn->reqs_.empty() && conn->reqs_.front()->cmds == 0)) {
conn->last_event_ = Conn::event::push;
yield async_send_receive(conn->push_channel_, std::move(self));
yield
async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -603,7 +609,8 @@ struct reader_op {
BOOST_ASSERT(!conn->reqs_.empty());
BOOST_ASSERT(conn->reqs_.front()->cmds != 0);
conn->reqs_.front()->timer.cancel_one();
yield conn->read_timer_.async_wait(std::move(self));
yield
conn->read_timer_.async_wait(std::move(self));
if (!conn->socket_->is_open()) {
self.complete({});
return;
@@ -641,14 +648,8 @@ struct runexec_op {
std::move(self));
switch (order[0]) {
case 0:
{
self.complete(ec1, n);
} break;
case 1:
{
self.complete(ec2, n);
} break;
case 0: self.complete(ec1, n); break;
case 1: self.complete(ec2, n); break;
default: BOOST_ASSERT(false);
}
}

View File

@@ -56,26 +56,18 @@ struct connect_op {
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, ep);
return;
}
} break;
case 0: self.complete(ec1, ep); break;
case 1:
{
if (!ec2) {
if (ec2)
self.complete({}, ep);
else
self.complete(error::connect_timeout, ep);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, ep);
}
}
};
@@ -142,13 +134,15 @@ struct send_receive_op {
{
reenter (coro)
{
yield channel->async_send(boost::system::error_code{}, 0, std::move(self));
yield
channel->async_send(boost::system::error_code{}, 0, std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
yield channel->async_receive(std::move(self));
yield
channel->async_receive(std::move(self));
self.complete(ec, 0);
}
}

View File

@@ -25,6 +25,9 @@ enum class error
/// Idle timeout.
idle_timeout,
/// Exec timeout.
exec_timeout,
/// Invalid RESP3 type.
invalid_data_type,
@@ -44,16 +47,16 @@ enum class error
empty_field,
/// Expects a simple RESP3 type but got an aggregate.
expects_simple_type,
expects_resp3_simple_type,
/// Expects aggregate type.
expects_aggregate_type,
/// Expects aggregate.
expects_resp3_aggregate,
/// Expects a map but got other aggregate.
expects_map_type,
expects_resp3_map,
/// Expects a set aggregate but got something else.
expects_set_type,
expects_resp3_set,
/// Nested response not supported.
nested_aggregate_unsupported,
@@ -70,7 +73,7 @@ enum class error
/// Not a double
not_a_double,
/// Got RESP3 null type.
/// Got RESP3 null.
null
};

View File

@@ -1,151 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_EXPERIMENTAL_SYNC_HPP
#define AEDIS_EXPERIMENTAL_SYNC_HPP
#include <aedis/adapt.hpp>
#include <aedis/connection.hpp>
#include <aedis/resp3/request.hpp>
namespace aedis {
namespace experimental {
namespace detail {
struct sync {
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
};
} // detail
/** @brief Executes a command.
* @ingroup any
*
* This function will block until execution completes.
*
* @param conn The connection.
* @param req The request.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class Connection, class ResponseAdapter>
std::size_t
exec(
Connection& conn,
resp3::request const& req,
ResponseAdapter adapter,
boost::system::error_code& ec)
{
detail::sync sh;
std::size_t res = 0;
auto f = [&conn, &ec, &res, &sh, &req, adapter]()
{
conn.async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Executes a command.
* @ingroup any
*
* This function will block until execution completes.
*
* @param conn The connection.
* @param req The request.
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <
class Connection,
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
std::size_t
exec(
Connection& conn,
resp3::request const& req,
ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = exec(conn, req, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives server pushes synchronusly.
* @ingroup any
*
* This function will block until execution completes.
*
* @param conn The connection.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class Connection, class ResponseAdapter>
auto receive_event(
Connection& conn,
ResponseAdapter adapter,
boost::system::error_code& ec)
{
using event_type = typename Connection::event;
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
event_type ev = event_type::invalid;
auto f = [&conn, &ec, &ev, &mutex, &cv, &ready, adapter]()
{
conn.async_receive_event(adapter, [&cv, &mutex, &ready, &ev, &ec](auto const& ecp, event_type evp) {
std::unique_lock ul(mutex);
ec = ecp;
ev = evp;
ready = true;
ul.unlock();
cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
std::unique_lock lk(mutex);
cv.wait(lk, [&ready]{return ready;});
return ev;
}
/// TODO
template <
class Connection,
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
auto receive_event(
Connection& conn,
ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = receive_event(conn, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
} // experimental
} // aedis
#endif // AEDIS_EXPERIMENTAL_SYNC_HPP

View File

@@ -22,16 +22,17 @@ struct error_category_impl : boost::system::error_category {
case error::resolve_timeout: return "Resolve operation timeout.";
case error::connect_timeout: return "Connect operation timeout.";
case error::idle_timeout: return "Idle timeout.";
case error::exec_timeout: return "Exec timeout.";
case error::invalid_data_type: return "Invalid resp3 type.";
case error::not_a_number: return "Can't convert string to number.";
case error::unexpected_read_size: return "Unexpected read size.";
case error::exceeeds_max_nested_depth: return "Exceeds the maximum number of nested responses.";
case error::unexpected_bool_value: return "Unexpected bool value.";
case error::empty_field: return "Expected field value is empty.";
case error::expects_simple_type: return "Expects a simple RESP3 type.";
case error::expects_aggregate_type: return "Expects aggregate type.";
case error::expects_map_type: return "Expects map type.";
case error::expects_set_type: return "Expects set type.";
case error::expects_resp3_simple_type: return "Expects a resp3 simple type.";
case error::expects_resp3_aggregate: return "Expects resp3 aggregate.";
case error::expects_resp3_map: return "Expects resp3 map.";
case error::expects_resp3_set: return "Expects resp3 set.";
case error::nested_aggregate_unsupported: return "Nested aggregate unsupported.";
case error::simple_error: return "Got RESP3 simple-error.";
case error::blob_error: return "Got RESP3 blob-error.";

View File

@@ -1,164 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_RESP3_COMPOSE_HPP
#define AEDIS_RESP3_COMPOSE_HPP
#include <string>
#include <tuple>
#include <boost/hana.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/resp3/type.hpp>
namespace aedis {
namespace resp3 {
constexpr char separator[] = "\r\n";
/** @brief Adds a bulk to the request.
* @ingroup any
*
* This function is useful in serialization of your own data
* structures in a request. For example
*
* @code
* void to_bulk(std::string& to, mystruct const& obj)
* {
* auto const str = // Convert obj to a string.
* resp3::to_bulk(to, str);
* }
* @endcode
*
* @param to Storage on which data will be copied into.
* @param data Data that will be serialized and stored in @c to.
*
* See more in \ref requests-serialization.
*/
template <class Request>
void to_bulk(Request& to, boost::string_view data)
{
auto const str = std::to_string(data.size());
to += to_code(type::blob_string);
to.append(std::cbegin(str), std::cend(str));
to += separator;
to.append(std::cbegin(data), std::cend(data));
to += separator;
}
template <class Request, class T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
void to_bulk(Request& to, T n)
{
auto const s = std::to_string(n);
to_bulk(to, boost::string_view{s});
}
namespace detail {
template <class T>
struct add_bulk_impl {
template <class Request>
static void add(Request& to, T const& from)
{
using namespace aedis::resp3;
to_bulk(to, from);
}
};
template <class U, class V>
struct add_bulk_impl<std::pair<U, V>> {
template <class Request>
static void add(Request& to, std::pair<U, V> const& from)
{
using namespace aedis::resp3;
to_bulk(to, from.first);
to_bulk(to, from.second);
}
};
template <class ...Ts>
struct add_bulk_impl<boost::hana::tuple<Ts...>> {
template <class Request>
static void add(Request& to, boost::hana::tuple<Ts...> const& from)
{
using boost::hana::for_each;
// Fold expressions is C++17 so we use hana.
//(resp3::add_bulk(*request_, args), ...);
for_each(from, [&](auto const& e) {
using namespace aedis::resp3;
to_bulk(to, e);
});
}
};
} // detail
/** \internal
* \brief Adds a resp3 header to the request.
* \ingroup any
*
* See mystruct.hpp for an example.
*/
template <class Request>
void add_header(Request& to, type t, std::size_t size)
{
auto const str = std::to_string(size);
to += to_code(t);
to.append(std::cbegin(str), std::cend(str));
to += separator;
}
/* Adds a rep3 bulk to the request.
*
* This function adds \c data as a bulk string to the request \c to.
*/
template <class Request, class T>
void add_bulk(Request& to, T const& data)
{
detail::add_bulk_impl<T>::add(to, data);
}
template <class>
struct bulk_counter;
template <class>
struct bulk_counter {
static constexpr auto size = 1U;
};
template <class T, class U>
struct bulk_counter<std::pair<T, U>> {
static constexpr auto size = 2U;
};
template <class Request>
void add_blob(Request& to, boost::string_view blob)
{
to.append(std::cbegin(blob), std::cend(blob));
to += separator;
}
/** \internal
* \brief Adds a separator to the request.
* \ingroup any
*
* See mystruct.hpp for an example.
*/
template <class Request>
void add_separator(Request& to)
{
to += separator;
}
} // resp3
} // aedis
#endif // AEDIS_RESP3_COMPOSE_HPP

View File

@@ -129,26 +129,18 @@ struct exec_with_timeout_op {
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1, 0);
return;
}
} break;
case 0: self.complete(ec1, n); break;
case 1:
{
if (!ec2) {
self.complete(aedis::error::idle_timeout, 0);
return;
}
if (ec2)
self.complete({}, n);
else
self.complete(aedis::error::exec_timeout, 0);
} break;
default: BOOST_ASSERT(false);
}
self.complete({}, n);
}
}
};

View File

@@ -12,27 +12,25 @@ namespace resp3 {
char const* to_string(type t)
{
static char const* table[] =
{ "array"
, "push"
, "set"
, "map"
, "attribute"
, "simple_string"
, "simple_error"
, "number"
, "doublean"
, "boolean"
, "big_number"
, "null"
, "blob_error"
, "verbatim_string"
, "blob_string"
, "streamed_string_part"
, "invalid"
};
return table[static_cast<int>(t)];
switch (t) {
case type::array: return "array";
case type::push: return "push";
case type::set: return "set";
case type::map: return "map";
case type::attribute: return "attribute";
case type::simple_string: return "simple_string";
case type::simple_error: return "simple_error";
case type::number: return "number";
case type::doublean: return "doublean";
case type::boolean: return "boolean";
case type::big_number: return "big_number";
case type::null: return "null";
case type::blob_error: return "blob_error";
case type::verbatim_string: return "verbatim_string";
case type::blob_string: return "blob_string";
case type::streamed_string_part: return "streamed_string_part";
default: return "invalid";
}
}
std::ostream& operator<<(std::ostream& os, type t)

View File

@@ -39,10 +39,10 @@ struct node {
String value;
};
/** \brief Converts the node to a string.
* \ingroup any
/** @brief Converts the node to a string.
* @relates node
*
* \param in The node object.
* @param in The node object.
*/
template <class String>
std::string to_string(node<String> const& in)
@@ -60,8 +60,8 @@ std::string to_string(node<String> const& in)
return out;
}
/** \brief Compares a node for equality.
* \ingroup any
/** @brief Compares a node for equality.
* @relates node
*
* @param a Left hand side node object.
* @param b Right hand side node object.
@@ -75,8 +75,8 @@ bool operator==(node<String> const& a, node<String> const& b)
&& a.value == b.value;
};
/** \brief Writes the node string to the stream.
* \ingroup any
/** @brief Writes the node string to the stream.
* @relates node
*
* @param os Output stream.
* @param node Node object.

View File

@@ -7,10 +7,14 @@
#ifndef AEDIS_RESP3_REQUEST_HPP
#define AEDIS_RESP3_REQUEST_HPP
#include <aedis/resp3/compose.hpp>
#include <string>
#include <tuple>
#include <boost/hana.hpp>
#include <boost/utility/string_view.hpp>
#include <aedis/resp3/type.hpp>
// NOTE: Consider detecting tuples in the type in the parameter pack
// to calculate the header size correctly.
//
@@ -19,10 +23,130 @@
namespace aedis {
namespace resp3 {
constexpr char separator[] = "\r\n";
/** @brief Adds a bulk to the request.
* @relates request
*
* This function is useful in serialization of your own data
* structures in a request. For example
*
* @code
* void to_bulk(std::string& to, mystruct const& obj)
* {
* auto const str = // Convert obj to a string.
* resp3::to_bulk(to, str);
* }
* @endcode
*
* @param to Storage on which data will be copied into.
* @param data Data that will be serialized and stored in @c to.
*
* See more in \ref requests-serialization.
*/
template <class Request>
void to_bulk(Request& to, boost::string_view data)
{
auto const str = std::to_string(data.size());
to += to_code(type::blob_string);
to.append(std::cbegin(str), std::cend(str));
to += separator;
to.append(std::cbegin(data), std::cend(data));
to += separator;
}
template <class Request, class T, typename = typename std::enable_if<std::is_integral<T>::value>::type>
void to_bulk(Request& to, T n)
{
auto const s = std::to_string(n);
to_bulk(to, boost::string_view{s});
}
namespace detail {
bool has_push_response(boost::string_view cmd);
template <class T>
struct add_bulk_impl {
template <class Request>
static void add(Request& to, T const& from)
{
using namespace aedis::resp3;
to_bulk(to, from);
}
};
template <class U, class V>
struct add_bulk_impl<std::pair<U, V>> {
template <class Request>
static void add(Request& to, std::pair<U, V> const& from)
{
using namespace aedis::resp3;
to_bulk(to, from.first);
to_bulk(to, from.second);
}
};
template <class ...Ts>
struct add_bulk_impl<boost::hana::tuple<Ts...>> {
template <class Request>
static void add(Request& to, boost::hana::tuple<Ts...> const& from)
{
using boost::hana::for_each;
// Fold expressions is C++17 so we use hana.
//(detail::add_bulk(*request_, args), ...);
for_each(from, [&](auto const& e) {
using namespace aedis::resp3;
to_bulk(to, e);
});
}
};
template <class Request>
void add_header(Request& to, type t, std::size_t size)
{
auto const str = std::to_string(size);
to += to_code(t);
to.append(std::cbegin(str), std::cend(str));
to += separator;
}
template <class Request, class T>
void add_bulk(Request& to, T const& data)
{
detail::add_bulk_impl<T>::add(to, data);
}
template <class>
struct bulk_counter;
template <class>
struct bulk_counter {
static constexpr auto size = 1U;
};
template <class T, class U>
struct bulk_counter<std::pair<T, U>> {
static constexpr auto size = 2U;
};
template <class Request>
void add_blob(Request& to, boost::string_view blob)
{
to.append(std::cbegin(blob), std::cend(blob));
to += separator;
}
template <class Request>
void add_separator(Request& to)
{
to += separator;
}
} // detail
/** @brief Creates Redis requests.
@@ -83,9 +207,9 @@ public:
using resp3::type;
auto constexpr pack_size = sizeof...(Ts);
resp3::add_header(payload_, type::array, 1 + pack_size);
resp3::add_bulk(payload_, cmd);
resp3::add_bulk(payload_, make_tuple(args...));
detail::add_header(payload_, type::array, 1 + pack_size);
detail::add_bulk(payload_, cmd);
detail::add_bulk(payload_, make_tuple(args...));
if (!detail::has_push_response(cmd))
++commands_;
@@ -121,14 +245,14 @@ public:
if (begin == end)
return;
auto constexpr size = resp3::bulk_counter<value_type>::size;
auto constexpr size = detail::bulk_counter<value_type>::size;
auto const distance = std::distance(begin, end);
resp3::add_header(payload_, type::array, 2 + size * distance);
resp3::add_bulk(payload_, cmd);
resp3::add_bulk(payload_, key);
detail::add_header(payload_, type::array, 2 + size * distance);
detail::add_bulk(payload_, cmd);
detail::add_bulk(payload_, key);
for (; begin != end; ++begin)
resp3::add_bulk(payload_, *begin);
detail::add_bulk(payload_, *begin);
if (!detail::has_push_response(cmd))
++commands_;
@@ -160,13 +284,13 @@ public:
if (begin == end)
return;
auto constexpr size = resp3::bulk_counter<value_type>::size;
auto constexpr size = detail::bulk_counter<value_type>::size;
auto const distance = std::distance(begin, end);
resp3::add_header(payload_, type::array, 1 + size * distance);
resp3::add_bulk(payload_, cmd);
detail::add_header(payload_, type::array, 1 + size * distance);
detail::add_bulk(payload_, cmd);
for (; begin != end; ++begin)
resp3::add_bulk(payload_, *begin);
detail::add_bulk(payload_, *begin);
if (!detail::has_push_response(cmd))
++commands_;

245
include/aedis/sync.hpp Normal file
View File

@@ -0,0 +1,245 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_SYNC_HPP
#define AEDIS_SYNC_HPP
#include <condition_variable>
#include <aedis/resp3/request.hpp>
namespace aedis {
/** @brief A high level synchronous connection to Redis.
* @ingroup any
*
* This class keeps a healthy and thread safe connection to the Redis
* instance where commands can be sent at any time. For more details,
* please see the documentation of each individual function.
*
*/
template <class Connection>
class sync {
public:
using event = typename Connection::event;
using config = typename Connection::config;
/** @brief Constructor
*
* @param ex Executor
* @param cfg Config options.
*/
template <class Executor>
sync(Executor ex, config cfg = config{}) : conn_{ex, cfg} { }
/** @brief Executes a request synchronously.
*
* The functions calls `connection::async_exec` and waits
* for its completion.
*
* @param req The request.
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter>
std::size_t
exec(resp3::request const& req, ResponseAdapter adapter, boost::system::error_code& ec)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, &req, adapter]()
{
conn_.async_exec(req, adapter, [&sh, &res, &ec](auto const& ecp, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Executes a command synchronously
*
* The functions calls `connection::async_exec` and waits for its
* completion.
*
* @param req The request.
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <class ResponseAdapter = detail::response_traits<void>::adapter_type>
std::size_t exec(resp3::request const& req, ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = exec(req, adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives server pushes synchronusly.
*
* The functions calls `connection::async_receive_push` and
* waits for its completion.
*
* @param adapter The response adapter.
* @param ec Error code in case of error.
* @returns The number of bytes received.
*/
template <class ResponseAdapter>
auto receive_push(ResponseAdapter adapter, boost::system::error_code& ec)
{
sync_helper sh;
std::size_t res = 0;
auto f = [this, &ec, &res, &sh, adapter]()
{
conn_.async_receive_push(adapter, [&ec, &res, &sh](auto const& e, std::size_t n) {
std::unique_lock ul(sh.mutex);
ec = e;
res = n;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Receives server pushes synchronusly.
*
* The functions calls `connection::async_receive_push` and
* waits for its completion.
*
* @param adapter The response adapter.
* @throws std::system_error in case of error.
* @returns The number of bytes received.
*/
template <class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
auto receive_push(ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = receive_push(adapter, ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Receives events synchronously.
*
* The functions calls `connection::async_receive_event` and
* waits for its completion.
*
* @param ec Error code in case of error.
* @returns The event received.
*/
auto receive_event(boost::system::error_code& ec)
{
sync_helper sh;
auto res = event::invalid;
auto f = [this, &ec, &res, &sh]()
{
conn_.async_receive_event([&ec, &res, &sh](auto const& ecp, event ev) {
std::unique_lock ul(sh.mutex);
ec = ecp;
res = ev;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
return res;
}
/** @brief Receives events synchronously
*
* The functions calls `connection::async_receive_event` and
* waits for its completion.
*
* @throws std::system_error in case of error.
* @returns The event received.
*/
auto receive_event()
{
boost::system::error_code ec;
auto const res = receive_event(ec);
if (ec)
throw std::system_error(ec);
return res;
}
/** @brief Calls \c async_run from the underlying connection.
*
* The functions calls `connection::async_run` and waits for its
* completion.
*
* @param ec Error code.
*/
void run(boost::system::error_code& ec)
{
sync_helper sh;
auto f = [this, &ec, &sh]()
{
conn_.async_run([&ec, &sh](auto const& e) {
std::unique_lock ul(sh.mutex);
ec = e;
sh.ready = true;
ul.unlock();
sh.cv.notify_one();
});
};
boost::asio::dispatch(boost::asio::bind_executor(conn_.get_executor(), f));
std::unique_lock lk(sh.mutex);
sh.cv.wait(lk, [&sh]{return sh.ready;});
}
/** @brief Calls \c async_run from the underlying connection.
*
* The functions calls `connection::async_run` and waits for its
* completion.
*
* @throws std::system_error.
*/
void run()
{
boost::system::error_code ec;
run(ec);
if (ec)
throw std::system_error(ec);
}
private:
struct sync_helper {
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
};
Connection conn_;
};
} // aedis
#endif // AEDIS_SYNC_HPP

View File

@@ -1,73 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <iostream>
#include <stdlib.h>
void expect_true(bool a, std::string const& msg = "")
{
if (a) {
if (!msg.empty())
std::cout << "Success: " << msg << std::endl;
} else {
std::cout << "Error: " << msg << std::endl;
exit(EXIT_FAILURE);
}
}
template <class T>
void expect_eq(T const& a, T const& b, std::string const& msg = "")
{
if (a == b) {
if (!msg.empty())
std::cout << "Success: " << msg << std::endl;
} else {
std::cout << "Error: " << msg << std::endl;
exit(EXIT_FAILURE);
}
}
template <class T>
void expect_neq(T const& a, T const& b, std::string const& msg = "")
{
if (a != b) {
if (!msg.empty())
std::cout << "Success: " << msg << std::endl;
} else {
std::cout << "Error: " << msg << std::endl;
exit(EXIT_FAILURE);
}
}
template <class T>
void expect_error(boost::system::error_code a, T expected = {}, std::string const& msg = "")
{
if (a == expected) {
if (a)
std::cout << "Success: " << a.message() << " (" << a.category().name() << ") " << msg << std::endl;
} else {
std::cout << "Error: " << a.message() << " (" << a.category().name() << ") " << msg << std::endl;
exit(EXIT_FAILURE);
}
}
inline
void expect_no_error(boost::system::error_code ec, std::string const& msg)
{
expect_error(ec, boost::system::error_code{}, msg);
}
template <class T>
void check_empty(T const& t)
{
if (t.empty()) {
//std::cout << "Success: " << std::endl;
} else {
std::cout << "Error: Not empty" << std::endl;
exit(EXIT_FAILURE);
}
}

View File

@@ -14,14 +14,16 @@
#include <boost/system/errc.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "check.hpp"
namespace net = boost::asio;
using aedis::resp3::request;
using aedis::adapt;
using connection = aedis::connection<>;
using error_code = boost::system::error_code;
using net::experimental::as_tuple;
@@ -36,7 +38,7 @@ bool is_host_not_found(boost::system::error_code ec)
//----------------------------------------------------------------
// Tests whether resolve fails with the correct error.
void test_resolve()
BOOST_AUTO_TEST_CASE(test_resolve)
{
connection::config cfg;
cfg.host = "Atibaia";
@@ -46,7 +48,7 @@ void test_resolve()
net::io_context ioc;
connection db{ioc, cfg};
db.async_run([](auto ec) {
expect_true(is_host_not_found(ec), "test_resolve");
BOOST_TEST(is_host_not_found(ec));
});
ioc.run();
@@ -54,7 +56,7 @@ void test_resolve()
//----------------------------------------------------------------
void test_connect()
BOOST_AUTO_TEST_CASE(test_connect)
{
connection::config cfg;
cfg.host = "127.0.0.1";
@@ -64,7 +66,7 @@ void test_connect()
net::io_context ioc;
connection db{ioc, cfg};
db.async_run([](auto ec) {
expect_error(ec, net::error::basic_errors::connection_refused, "test_connect");
BOOST_CHECK_EQUAL(ec, net::error::basic_errors::connection_refused);
});
ioc.run();
}
@@ -80,12 +82,12 @@ void test_quit1(connection::config const& cfg)
request req;
req.push("QUIT");
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_quit1");
db->async_exec(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
db->async_run([](auto ec){
expect_error(ec, net::error::misc_errors::eof, "test_quit1");
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
});
ioc.run();
@@ -99,14 +101,14 @@ void test_quit2(connection::config const& cfg)
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_quit2");
db->async_run(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
ioc.run();
}
void test_quit()
BOOST_AUTO_TEST_CASE(test_quit)
{
connection::config cfg;
@@ -133,8 +135,8 @@ void test_missing_push_reader1(connection::config const& cfg)
request req;
req.push("SUBSCRIBE", "channel");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_missing_push_reader1");
db->async_run(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
ioc.run();
@@ -149,8 +151,8 @@ void test_missing_push_reader2(connection::config const& cfg)
request req; // Wrong command syntax.
req.push("SUBSCRIBE");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_missing_push_reader2");
db->async_run(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
ioc.run();
@@ -166,83 +168,150 @@ void test_missing_push_reader3(connection::config const& cfg)
req.push("PING", "Message");
req.push("SUBSCRIBE");
db->async_run(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_missing_push_reader3");
db->async_run(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
ioc.run();
}
void test_idle()
BOOST_AUTO_TEST_CASE(test_idle)
{
std::cout << "test_idle" << std::endl;
connection::config cfg;
cfg.resolve_timeout = std::chrono::seconds{1};
cfg.connect_timeout = std::chrono::seconds{1};
cfg.ping_interval = std::chrono::seconds{1};
std::chrono::milliseconds ms{5000};
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
{
std::cout << "test_idle" << std::endl;
connection::config cfg;
cfg.resolve_timeout = std::chrono::seconds{1};
cfg.connect_timeout = std::chrono::seconds{1};
cfg.ping_interval = std::chrono::seconds{1};
request req;
req.push("CLIENT", "PAUSE", 5000);
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
db->async_exec(req, aedis::adapt(), [](auto ec, auto){
expect_no_error(ec, "test_idle");
});
request req;
req.push("CLIENT", "PAUSE", ms.count());
db->async_run([](auto ec){
expect_error(ec, aedis::error::idle_timeout, "test_idle");
});
db->async_exec(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
ioc.run();
db->async_run([](auto ec){
BOOST_CHECK_EQUAL(ec, aedis::error::idle_timeout);
});
ioc.run();
}
//----------------------------------------------------------------
// Since we have paused the server above, we have to wait until the
// server is responsive again, so as not to cause other tests to
// fail.
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->get_config().ping_interval = 2* ms;
db->get_config().resolve_timeout = 2 * ms;
db->get_config().connect_timeout = 2 * ms;
db->get_config().ping_interval = 2 * ms;
request req;
req.push("QUIT");
db->async_run(req, adapt(), [](auto ec, auto){
BOOST_TEST(!ec);
});
ioc.run();
}
}
#ifdef BOOST_ASIO_HAS_CO_AWAIT
net::awaitable<void>
push_consumer1(std::shared_ptr<connection> db, bool& received, char const* msg)
net::awaitable<void> push_consumer1(std::shared_ptr<connection> db, bool& push_received)
{
{
auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable));
expect_no_error(ec, msg);
received = true;
auto [ec, ev] = co_await db->async_receive_push(adapt(), as_tuple(net::use_awaitable));
BOOST_TEST(!ec);
}
{
auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable));
expect_error(ec, boost::asio::experimental::channel_errc::channel_cancelled, msg);
auto [ec, ev] = co_await db->async_receive_push(adapt(), as_tuple(net::use_awaitable));
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::channel_errc::channel_cancelled);
}
push_received = true;
}
net::awaitable<void> event_consumer1(std::shared_ptr<connection> db, bool& event_received)
{
{
auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable));
auto const r = ev == connection::event::resolve;
BOOST_TEST(r);
BOOST_TEST(!ec);
}
{
auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable));
auto const r = ev == connection::event::connect;
BOOST_TEST(r);
BOOST_TEST(!ec);
}
{
auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable));
auto const r = ev == connection::event::hello;
BOOST_TEST(r);
BOOST_TEST(!ec);
}
{
auto [ec, ev] = co_await db->async_receive_event(as_tuple(net::use_awaitable));
BOOST_CHECK_EQUAL(ec, boost::asio::experimental::channel_errc::channel_cancelled);
}
event_received = true;
}
void test_push_is_received1(connection::config const& cfg)
{
std::cout << "test_push_is_received1" << std::endl;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
db->get_config().enable_events = true;
request req;
req.push("SUBSCRIBE", "channel");
req.push("QUIT");
db->async_run(req, aedis::adapt(), [db](auto ec, auto){
expect_no_error(ec, "test_push_is_received1");
db->cancel_event_receiver();
db->async_run(req, adapt(), [db](auto ec, auto){
BOOST_TEST(!ec);
db->cancel(connection::operation::receive_event);
db->cancel(connection::operation::receive_push);
});
bool received = false;
bool push_received = false;
net::co_spawn(
ioc.get_executor(),
push_consumer1(db, received, "test_push_is_received1"),
push_consumer1(db, push_received),
net::detached);
bool event_received = false;
net::co_spawn(
ioc.get_executor(),
event_consumer1(db, event_received),
net::detached);
ioc.run();
expect_true(received);
BOOST_TEST(push_received);
BOOST_TEST(event_received);
}
void test_push_is_received2(connection::config const& cfg)
{
std::cout << "test_push_is_received2" << std::endl;
request req1;
req1.push("PING", "Message1");
@@ -256,27 +325,39 @@ void test_push_is_received2(connection::config const& cfg)
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
db->get_config().enable_events = true;
auto handler =[](auto ec, auto...)
{ expect_no_error(ec, "test_push_is_received2"); };
{
BOOST_TEST(!ec);
};
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req3, aedis::adapt(), handler);
db->async_exec(req1, adapt(), handler);
db->async_exec(req2, adapt(), handler);
db->async_exec(req3, adapt(), handler);
db->async_run([db](auto ec, auto...) {
expect_error(ec, net::error::misc_errors::eof, "test_push_is_received2");
db->cancel_event_receiver();
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
db->cancel(connection::operation::receive_event);
db->cancel(connection::operation::receive_push);
});
bool received = false;
bool push_received = false;
net::co_spawn(
ioc.get_executor(),
push_consumer1(db, received, "test_push_is_received2"),
push_consumer1(db, push_received),
net::detached);
bool event_received = false;
net::co_spawn(
ioc.get_executor(),
event_consumer1(db, event_received),
net::detached);
ioc.run();
expect_true(received);
BOOST_TEST(push_received);
BOOST_TEST(event_received);
}
net::awaitable<void> test_reconnect_impl(std::shared_ptr<connection> db)
@@ -285,16 +366,19 @@ net::awaitable<void> test_reconnect_impl(std::shared_ptr<connection> db)
req.push("QUIT");
for (auto i = 0;;) {
auto ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::resolve, "test_reconnect.");
auto ev = co_await db->async_receive_event(net::use_awaitable);
auto const r1 = ev == connection::event::resolve;
BOOST_TEST(r1);
ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::connect, "test_reconnect.");
ev = co_await db->async_receive_event(net::use_awaitable);
auto const r2 = ev == connection::event::connect;
BOOST_TEST(r2);
ev = co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
expect_eq(ev, connection::event::hello, "test_reconnect.");
ev = co_await db->async_receive_event(net::use_awaitable);
auto const r3 = ev == connection::event::hello;
BOOST_TEST(r3);
co_await db->async_exec(req, aedis::adapt(), net::use_awaitable);
co_await db->async_exec(req, adapt(), net::use_awaitable);
// Test 5 reconnetions and returns.
@@ -321,7 +405,7 @@ void test_reconnect()
net::co_spawn(ioc, test_reconnect_impl(db), net::detached);
db->async_run([](auto ec) {
expect_error(ec, net::error::misc_errors::eof, "test_reconnect.");
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
});
ioc.run();
@@ -332,7 +416,7 @@ net::awaitable<void>
push_consumer3(std::shared_ptr<connection> db)
{
for (;;)
co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
co_await db->async_receive_push(adapt(), net::use_awaitable);
}
// Test many subscribe requests.
@@ -353,27 +437,27 @@ void test_push_many_subscribes(connection::config const& cfg)
auto handler =[](auto ec, auto...)
{
expect_no_error(ec, "test_push_many_subscribes");
BOOST_TEST(!ec);
};
net::io_context ioc;
auto db = std::make_shared<connection>(ioc, cfg);
db->async_exec(req0, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req1, aedis::adapt(), handler);
db->async_exec(req2, aedis::adapt(), handler);
db->async_exec(req3, aedis::adapt(), handler);
db->async_exec(req0, adapt(), handler);
db->async_exec(req1, adapt(), handler);
db->async_exec(req2, adapt(), handler);
db->async_exec(req2, adapt(), handler);
db->async_exec(req1, adapt(), handler);
db->async_exec(req2, adapt(), handler);
db->async_exec(req1, adapt(), handler);
db->async_exec(req2, adapt(), handler);
db->async_exec(req2, adapt(), handler);
db->async_exec(req1, adapt(), handler);
db->async_exec(req2, adapt(), handler);
db->async_exec(req3, adapt(), handler);
db->async_run([db](auto ec, auto...) {
expect_error(ec, net::error::misc_errors::eof, "test_push_many_subscribes");
db->cancel_event_receiver();
BOOST_CHECK_EQUAL(ec, net::error::misc_errors::eof);
db->cancel(connection::operation::receive_push);
});
net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);
@@ -382,7 +466,7 @@ void test_push_many_subscribes(connection::config const& cfg)
#endif
void test_push()
BOOST_AUTO_TEST_CASE(test_push)
{
connection::config cfg;
@@ -405,18 +489,3 @@ void test_push()
test_missing_push_reader3(cfg);
}
int main()
{
test_resolve();
test_connect();
test_quit();
test_push();
#ifdef BOOST_ASIO_HAS_CO_AWAIT
test_reconnect();
#endif
// Must come last as it sends a client pause.
test_idle();
}

View File

@@ -7,6 +7,7 @@
#include <map>
#include <iostream>
#include <optional>
#include <sstream>
#include <boost/system/errc.hpp>
#include <boost/asio/awaitable.hpp>
@@ -15,18 +16,17 @@
#include <boost/asio/detached.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/beast/_experimental/test/stream.hpp>
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
#include <aedis.hpp>
#include <aedis/src.hpp>
#include "check.hpp"
#include "config.h"
namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using test_stream = boost::beast::test::stream;
using aedis::adapter::adapt;
using aedis::adapter::adapt2;
using node_type = aedis::resp3::node<std::string>;
//-------------------------------------------------------------------
@@ -35,7 +35,7 @@ template <class Result>
struct expect {
std::string in;
Result expected;
std::string name;
std::string name; // Currently unused.
boost::system::error_code ec{};
};
@@ -47,12 +47,13 @@ void test_sync(net::any_io_executor ex, expect<Result> e)
ts.append(e.in);
Result result;
boost::system::error_code ec;
resp3::read(ts, net::dynamic_buffer(rbuffer), adapt(result), ec);
expect_error(ec, e.ec);
resp3::read(ts, net::dynamic_buffer(rbuffer), adapt2(result), ec);
BOOST_CHECK_EQUAL(ec, e.ec);
if (e.ec)
return;
check_empty(rbuffer);
expect_eq(result, e.expected, e.name);
BOOST_TEST(rbuffer.empty());
auto const res = result == e.expected;
BOOST_TEST(res);
}
template <class Result>
@@ -76,17 +77,18 @@ public:
auto self = this->shared_from_this();
auto f = [self](auto ec, auto)
{
expect_error(ec, self->data_.ec);
BOOST_CHECK_EQUAL(ec, self->data_.ec);
if (self->data_.ec)
return;
check_empty(self->rbuffer_);
expect_eq(self->result_, self->data_.expected, self->data_.name);
BOOST_TEST(self->rbuffer_.empty());
auto const res = self->result_ == self->data_.expected;
BOOST_TEST(res);
};
resp3::async_read(
ts_,
net::dynamic_buffer(rbuffer_),
adapt(result_),
adapt2(result_),
f);
}
};
@@ -97,8 +99,9 @@ void test_async(net::any_io_executor ex, expect<Result> e)
std::make_shared<async_test<Result>>(ex, e)->run();
}
void test_number(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_number)
{
net::io_context ioc;
boost::optional<int> ok;
ok = 11;
@@ -108,12 +111,13 @@ void test_number(net::io_context& ioc)
auto const in03 = expect<int>{":11\r\n", int{11}, "number.int"};
auto const in04 = expect<boost::optional<int>>{":11\r\n", ok, "number.optional.int"};
auto const in05 = expect<std::tuple<int>>{"*1\r\n:11\r\n", std::tuple<int>{11}, "number.tuple.int"};
auto const in06 = expect<boost::optional<int>>{"%11\r\n", boost::optional<int>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_simple_type)};
auto const in07 = expect<std::set<std::string>>{":11\r\n", std::set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_set_type)};
auto const in08 = expect<std::unordered_set<std::string>>{":11\r\n", std::unordered_set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_set_type)};
auto const in09 = expect<std::map<std::string, std::string>>{":11\r\n", std::map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_map_type)};
auto const in10 = expect<std::unordered_map<std::string, std::string>>{":11\r\n", std::unordered_map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_map_type)};
auto const in11 = expect<std::list<std::string>>{":11\r\n", std::list<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_aggregate_type)};
auto const in06 = expect<int>{"_\r\n", int{0}, "number.int", aedis::make_error_code(aedis::error::null)};
auto const in07 = expect<boost::optional<int>>{"%11\r\n", boost::optional<int>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_simple_type)};
auto const in08 = expect<std::set<std::string>>{":11\r\n", std::set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_set)};
auto const in09 = expect<std::unordered_set<std::string>>{":11\r\n", std::unordered_set<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_set)};
auto const in10 = expect<std::map<std::string, std::string>>{":11\r\n", std::map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_map)};
auto const in11 = expect<std::unordered_map<std::string, std::string>>{":11\r\n", std::unordered_map<std::string, std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_map)};
auto const in12 = expect<std::list<std::string>>{":11\r\n", std::list<std::string>{}, "number.optional.int", aedis::make_error_code(aedis::error::expects_resp3_aggregate)};
auto ex = ioc.get_executor();
@@ -128,6 +132,7 @@ void test_number(net::io_context& ioc)
test_sync(ex, in09);
test_sync(ex, in10);
test_sync(ex, in11);
test_sync(ex, in12);
test_async(ex, in01);
test_async(ex, in02);
@@ -140,10 +145,13 @@ void test_number(net::io_context& ioc)
test_async(ex, in09);
test_async(ex, in10);
test_async(ex, in11);
test_async(ex, in12);
ioc.run();
}
void test_bool(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_bool)
{
net::io_context ioc;
boost::optional<bool> ok;
ok = true;
@@ -156,10 +164,10 @@ void test_bool(net::io_context& ioc)
// Error
auto const in01 = expect<boost::optional<bool>>{"#11\r\n", boost::optional<bool>{}, "bool.error", aedis::make_error_code(aedis::error::unexpected_bool_value)};
auto const in03 = expect<std::set<int>>{"#t\r\n", std::set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_set_type)};
auto const in04 = expect<std::unordered_set<int>>{"#t\r\n", std::unordered_set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_set_type)};
auto const in05 = expect<std::map<int, int>>{"#t\r\n", std::map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_map_type)};
auto const in06 = expect<std::unordered_map<int, int>>{"#t\r\n", std::unordered_map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_map_type)};
auto const in03 = expect<std::set<int>>{"#t\r\n", std::set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_set)};
auto const in04 = expect<std::unordered_set<int>>{"#t\r\n", std::unordered_set<int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_set)};
auto const in05 = expect<std::map<int, int>>{"#t\r\n", std::map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_map)};
auto const in06 = expect<std::unordered_map<int, int>>{"#t\r\n", std::unordered_map<int, int>{}, "bool.error", aedis::make_error_code(aedis::error::expects_resp3_map)};
auto ex = ioc.get_executor();
@@ -182,10 +190,12 @@ void test_bool(net::io_context& ioc)
test_async(ex, in09);
test_async(ex, in10);
test_async(ex, in11);
ioc.run();
}
void test_streamed_string(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_streamed_string)
{
net::io_context ioc;
std::string const wire = "$?\r\n;4\r\nHell\r\n;5\r\no wor\r\n;1\r\nd\r\n;0\r\n";
std::vector<node_type> e1a
@@ -210,10 +220,12 @@ void test_streamed_string(net::io_context& ioc)
test_async(ex, in01);
test_async(ex, in02);
test_async(ex, in03);
ioc.run();
}
void test_push(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_push)
{
net::io_context ioc;
std::string const wire = ">4\r\n+pubsub\r\n+message\r\n+some-channel\r\n+some message\r\n";
std::vector<node_type> e1a
@@ -236,10 +248,12 @@ void test_push(net::io_context& ioc)
test_async(ex, in01);
test_async(ex, in02);
ioc.run();
}
void test_map(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_map)
{
net::io_context ioc;
using map_type = std::map<std::string, std::string>;
using mmap_type = std::multimap<std::string, std::string>;
using umap_type = std::unordered_map<std::string, std::string>;
@@ -249,6 +263,7 @@ void test_map(net::io_context& ioc)
using op_vec_type = boost::optional<std::vector<std::string>>;
using tuple_type = std::tuple<std::string, std::string, std::string, std::string, std::string, std::string, std::string, std::string>;
std::string const wire2 = "*3\r\n$2\r\n11\r\n$2\r\n22\r\n$1\r\n3\r\n";
std::string const wire = "%4\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n$4\r\nkey3\r\n$6\r\nvalue3\r\n$4\r\nkey3\r\n$6\r\nvalue3\r\n";
std::vector<node_type> expected_1a
@@ -319,8 +334,10 @@ void test_map(net::io_context& ioc)
auto const in07 = expect<op_map_type>{wire, expected_1d, "map.optional.map"};
auto const in08 = expect<op_vec_type>{wire, expected_1e, "map.optional.vector"};
auto const in09 = expect<std::tuple<op_map_type>>{"*1\r\n" + wire, std::tuple<op_map_type>{expected_1d}, "map.transaction.optional.map"};
auto const in10 = expect<int>{"%11\r\n", int{}, "map.invalid.int", aedis::make_error_code(aedis::error::expects_simple_type)};
auto const in10 = expect<int>{"%11\r\n", int{}, "map.invalid.int", aedis::make_error_code(aedis::error::expects_resp3_simple_type)};
auto const in11 = expect<tuple_type>{wire, e1f, "map.tuple"};
auto const in12 = expect<map_type>{wire2, map_type{}, "map.error", aedis::make_error_code(aedis::error::expects_resp3_map)};
auto const in13 = expect<map_type>{"_\r\n", map_type{}, "map.null", aedis::make_error_code(aedis::error::null)};
auto ex = ioc.get_executor();
@@ -335,6 +352,8 @@ void test_map(net::io_context& ioc)
test_sync(ex, in09);
test_sync(ex, in00);
test_sync(ex, in11);
test_sync(ex, in12);
test_sync(ex, in13);
test_async(ex, in00);
test_async(ex, in01);
@@ -347,6 +366,9 @@ void test_map(net::io_context& ioc)
test_async(ex, in09);
test_async(ex, in00);
test_async(ex, in11);
test_async(ex, in12);
test_async(ex, in13);
ioc.run();
}
void test_attribute(net::io_context& ioc)
@@ -377,8 +399,9 @@ void test_attribute(net::io_context& ioc)
test_async(ex, in02);
}
void test_array(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_array)
{
net::io_context ioc;
char const* wire = "*3\r\n$2\r\n11\r\n$2\r\n22\r\n$1\r\n3\r\n";
std::vector<node_type> e1a
@@ -404,6 +427,8 @@ void test_array(net::io_context& ioc)
auto const in06 = expect<std::array<int, 3>>{wire, e1f, "array.array"};
auto const in07 = expect<std::list<int>>{wire, e1g, "array.list"};
auto const in08 = expect<std::deque<int>>{wire, e1h, "array.deque"};
auto const in09 = expect<std::vector<int>>{"_\r\n", std::vector<int>{}, "array.vector", aedis::make_error_code(aedis::error::null)};
auto const in10 = expect<std::list<int>>{"_\r\n", std::list<int>{}, "array.list", aedis::make_error_code(aedis::error::null)};
auto ex = ioc.get_executor();
@@ -415,6 +440,7 @@ void test_array(net::io_context& ioc)
test_sync(ex, in06);
test_sync(ex, in07);
test_sync(ex, in08);
test_sync(ex, in09);
test_async(ex, in01);
test_async(ex, in02);
@@ -424,10 +450,13 @@ void test_array(net::io_context& ioc)
test_async(ex, in06);
test_async(ex, in07);
test_async(ex, in08);
test_async(ex, in09);
ioc.run();
}
void test_set(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_set)
{
net::io_context ioc;
using set_type = std::set<std::string>;
using mset_type = std::multiset<std::string>;
using uset_type = std::unordered_set<std::string>;
@@ -435,7 +464,9 @@ void test_set(net::io_context& ioc)
using vec_type = std::vector<std::string>;
using op_vec_type = boost::optional<std::vector<std::string>>;
std::string const wire2 = "*3\r\n$2\r\n11\r\n$2\r\n22\r\n$1\r\n3\r\n";
std::string const wire = "~6\r\n+orange\r\n+apple\r\n+one\r\n+two\r\n+three\r\n+orange\r\n";
std::vector<node_type> const expected1a
{ {resp3::type::set, 6UL, 0UL, {}}
, {resp3::type::simple_string, 1UL, 1UL, {"orange"}}
@@ -462,6 +493,7 @@ void test_set(net::io_context& ioc)
auto const in06 = expect<uset_type>{wire, e1c, "set.unordered_set"};
auto const in07 = expect<muset_type>{wire, e1g, "set.unordered_multiset"};
auto const in08 = expect<std::tuple<uset_type>>{"*1\r\n" + wire, std::tuple<uset_type>{e1c}, "set.tuple"};
auto const in09 = expect<set_type>{wire2, set_type{}, "set.error", aedis::make_error_code(aedis::error::expects_resp3_set)};
auto ex = ioc.get_executor();
@@ -474,6 +506,7 @@ void test_set(net::io_context& ioc)
test_sync(ex, in06);
test_sync(ex, in07);
test_sync(ex, in08);
test_sync(ex, in09);
test_async(ex, in00);
test_async(ex, in01);
@@ -484,10 +517,13 @@ void test_set(net::io_context& ioc)
test_async(ex, in06);
test_async(ex, in07);
test_async(ex, in08);
test_async(ex, in09);
ioc.run();
}
void test_simple_error(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_simple_error)
{
net::io_context ioc;
auto const in01 = expect<node_type>{"-Error\r\n", node_type{resp3::type::simple_error, 1UL, 0UL, {"Error"}}, "simple_error.node", aedis::make_error_code(aedis::error::simple_error)};
auto const in02 = expect<node_type>{"-\r\n", node_type{resp3::type::simple_error, 1UL, 0UL, {""}}, "simple_error.node.empty", aedis::make_error_code(aedis::error::simple_error)};
@@ -498,10 +534,12 @@ void test_simple_error(net::io_context& ioc)
test_async(ex, in01);
test_async(ex, in02);
ioc.run();
}
void test_blob_string(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_blob_string)
{
net::io_context ioc;
std::string str(100000, 'a');
str[1000] = '\r';
str[1001] = '\n';
@@ -529,15 +567,17 @@ void test_blob_string(net::io_context& ioc)
test_async(ex, in02);
test_async(ex, in03);
test_async(ex, in04);
ioc.run();
}
void test_double(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_double)
{
// TODO: Add test for double.
net::io_context ioc;
auto const in01 = expect<node_type>{",1.23\r\n", node_type{resp3::type::doublean, 1UL, 0UL, {"1.23"}}, "double.node"};
auto const in02 = expect<node_type>{",inf\r\n", node_type{resp3::type::doublean, 1UL, 0UL, {"inf"}}, "double.node (inf)"};
auto const in03 = expect<node_type>{",-inf\r\n", node_type{resp3::type::doublean, 1UL, 0UL, {"-inf"}}, "double.node (-inf)"};
auto const in04 = expect<double>{",1.23\r\n", double{1.23}, "double.double"};
auto const in05 = expect<double>{",er\r\n", double{0}, "double.double", aedis::make_error_code(aedis::error::not_a_double)};
auto ex = ioc.get_executor();
@@ -545,15 +585,19 @@ void test_double(net::io_context& ioc)
test_sync(ex, in02);
test_sync(ex, in03);
test_sync(ex, in04);
test_sync(ex, in05);
test_async(ex, in01);
test_async(ex, in02);
test_async(ex, in03);
test_async(ex, in04);
test_async(ex, in05);
ioc.run();
}
void test_blob_error(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_blob_error)
{
net::io_context ioc;
auto const in01 = expect<node_type>{"!21\r\nSYNTAX invalid syntax\r\n", node_type{resp3::type::blob_error, 1UL, 0UL, {"SYNTAX invalid syntax"}}, "blob_error", aedis::make_error_code(aedis::error::blob_error)};
auto const in02 = expect<node_type>{"!0\r\n\r\n", node_type{resp3::type::blob_error, 1UL, 0UL, {}}, "blob_error.empty", aedis::make_error_code(aedis::error::blob_error)};
@@ -564,10 +608,12 @@ void test_blob_error(net::io_context& ioc)
test_async(ex, in01);
test_async(ex, in02);
ioc.run();
}
void test_verbatim_string(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_verbatim_string)
{
net::io_context ioc;
auto const in01 = expect<node_type>{"=15\r\ntxt:Some string\r\n", node_type{resp3::type::verbatim_string, 1UL, 0UL, {"txt:Some string"}}, "verbatim_string"};
auto const in02 = expect<node_type>{"=0\r\n\r\n", node_type{resp3::type::verbatim_string, 1UL, 0UL, {}}, "verbatim_string.empty"};
@@ -578,10 +624,12 @@ void test_verbatim_string(net::io_context& ioc)
test_async(ex, in01);
test_async(ex, in02);
ioc.run();
}
void test_big_number(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_big_number)
{
net::io_context ioc;
auto const in01 = expect<node_type>{"(3492890328409238509324850943850943825024385\r\n", node_type{resp3::type::big_number, 1UL, 0UL, {"3492890328409238509324850943850943825024385"}}, "big_number.node"};
auto const in02 = expect<int>{"(\r\n", int{}, "big_number.error (empty field)", aedis::make_error_code(aedis::error::empty_field)};
@@ -592,10 +640,12 @@ void test_big_number(net::io_context& ioc)
test_async(ex, in01);
test_async(ex, in02);
ioc.run();
}
void test_simple_string(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_simple_string)
{
net::io_context ioc;
boost::optional<std::string> ok1, ok2;
ok1 = "OK";
ok2 = "";
@@ -616,10 +666,12 @@ void test_simple_string(net::io_context& ioc)
test_async(ex, in01);
test_async(ex, in02);
test_async(ex, in03);
ioc.run();
}
void test_resp3(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_resp3)
{
net::io_context ioc;
auto const in01 = expect<int>{"s11\r\n", int{}, "number.error", aedis::make_error_code(aedis::error::invalid_data_type)};
auto const in02 = expect<int>{":adf\r\n", int{11}, "number.int", aedis::make_error_code(aedis::error::not_a_number)};
auto const in03 = expect<int>{":\r\n", int{}, "number.error (empty field)", aedis::make_error_code(aedis::error::empty_field)};
@@ -639,10 +691,12 @@ void test_resp3(net::io_context& ioc)
test_async(ex, in03);
test_async(ex, in04);
test_async(ex, in05);
ioc.run();
}
void test_null(net::io_context& ioc)
BOOST_AUTO_TEST_CASE(test_null)
{
net::io_context ioc;
using op_type_01 = boost::optional<bool>;
using op_type_02 = boost::optional<int>;
using op_type_03 = boost::optional<std::string>;
@@ -684,34 +738,73 @@ void test_null(net::io_context& ioc)
test_async(ex, in07);
test_async(ex, in08);
test_async(ex, in09);
}
int main()
{
net::io_context ioc {1};
// Simple types.
test_simple_string(ioc);
test_simple_error(ioc);
test_blob_string(ioc);
test_blob_error(ioc);
test_number(ioc);
test_double(ioc);
test_bool(ioc);
test_null(ioc);
test_big_number(ioc);
test_verbatim_string(ioc);
// Aggregates.
test_array(ioc);
test_set(ioc);
test_map(ioc);
test_push(ioc);
test_streamed_string(ioc);
// RESP3
test_resp3(ioc);
ioc.run();
}
//-----------------------------------------------------------------------------------
void check_error(char const* name, aedis::error ev)
{
auto const ec = aedis::make_error_code(ev);
auto const& cat = ec.category();
BOOST_TEST(std::string(ec.category().name()) == name);
BOOST_TEST(!ec.message().empty());
BOOST_TEST(cat.equivalent(
static_cast<std::underlying_type<aedis::error>::type>(ev),
ec.category().default_error_condition(
static_cast<std::underlying_type<aedis::error>::type>(ev))));
BOOST_TEST(cat.equivalent(ec,
static_cast<std::underlying_type<aedis::error>::type>(ev)));
}
BOOST_AUTO_TEST_CASE(error)
{
check_error("aedis", aedis::error::resolve_timeout);
check_error("aedis", aedis::error::resolve_timeout);
check_error("aedis", aedis::error::connect_timeout);
check_error("aedis", aedis::error::idle_timeout);
check_error("aedis", aedis::error::exec_timeout);
check_error("aedis", aedis::error::invalid_data_type);
check_error("aedis", aedis::error::not_a_number);
check_error("aedis", aedis::error::unexpected_read_size);
check_error("aedis", aedis::error::exceeeds_max_nested_depth);
check_error("aedis", aedis::error::unexpected_bool_value);
check_error("aedis", aedis::error::empty_field);
check_error("aedis", aedis::error::expects_resp3_simple_type);
check_error("aedis", aedis::error::expects_resp3_aggregate);
check_error("aedis", aedis::error::expects_resp3_map);
check_error("aedis", aedis::error::expects_resp3_set);
check_error("aedis", aedis::error::nested_aggregate_unsupported);
check_error("aedis", aedis::error::simple_error);
check_error("aedis", aedis::error::blob_error);
check_error("aedis", aedis::error::incompatible_size);
check_error("aedis", aedis::error::not_a_double);
check_error("aedis", aedis::error::null);
}
std::string get_type_as_str(aedis::resp3::type t)
{
std::ostringstream ss;
ss << t;
return ss.str();
}
BOOST_AUTO_TEST_CASE(type)
{
BOOST_TEST(!get_type_as_str(aedis::resp3::type::array).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::push).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::set).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::map).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::attribute).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::simple_string).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::simple_error).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::number).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::doublean).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::boolean).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::big_number).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::null).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::blob_error).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::verbatim_string).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::blob_string).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::streamed_string_part).empty());
BOOST_TEST(!get_type_as_str(aedis::resp3::type::invalid).empty());
}

View File

@@ -16,7 +16,7 @@ namespace net = boost::asio;
namespace resp3 = aedis::resp3;
using aedis::resp3::request;
using aedis::adapter::adapt;
using aedis::adapter::adapt2;
using net::ip::tcp;
int main()
@@ -41,7 +41,7 @@ int main()
// Reads the responses to all commands in the request.
auto dbuffer = net::dynamic_buffer(buffer);
resp3::read(socket, dbuffer);
resp3::read(socket, dbuffer, adapt(resp));
resp3::read(socket, dbuffer, adapt2(resp));
resp3::read(socket, dbuffer);
std::cout << "Ping: " << resp << std::endl;