2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Progresses with connection events.

This commit is contained in:
Marcelo Zimbres
2022-08-06 12:36:13 +02:00
parent 97428dedb3
commit 54d448cad4
12 changed files with 131 additions and 77 deletions

View File

@@ -1,24 +1,29 @@
# Changelog
## v0.2.2
## master
* `connection::async_read_push` has been renamed to `async_receive`.
* Adds `experimental::exec` and `experimental::receive_event`
functions to offer a thread safe and synchronous way of executing
requests. See `intro_sync.cpp` and `subscriber_sync.cpp` for
examples.
* `connection::async_read_push` has been renamed to
`async_receive_event`.
* Uses `async_receive_event` to communicate internal events to the
user, see subscriber.cpp and `connection::event`.
* The `aedis` directory has been moved to `include` to look more
similar to Boost libraries. Users should now replace `-I/aedis-path`
with `-I/aedis-path/include` in the compiler flags.
* Adds `experimental::exec` functions to offer a thread safe and
synchronous way of executing requests. See `intro_sync.cpp` and
`subscriber_sync.cpp` for an example.
* Fixes a bug in the `connection::async_exec(host, port)` overload
that was causing crashes on reconnect.
that was causing crashes reconnection.
* Fixes the executor usage in the connection class. Before theses
changes it was only supporting `any_io_executor`.
changes it was imposing `any_io_executor` on users.
* `connection::async_receiver` is not cancelled anymore when
* `connection::async_receiver_event` is not cancelled anymore when
`connection::async_run` exits. This change simplifies the
implementation failover operations.
@@ -31,7 +36,8 @@
* Many simplifications in the `chat_room` example.
* Fixes build in clang the compilers and makes some improvements in the documentation.
* Fixes build in clang the compilers and makes some improvements in
the documentation.
##v0.2.1

View File

@@ -1 +1,16 @@
See the official github-pages for documentation: https://mzimbres.github.io/aedis
An async redis client designed for performance and scalability
### License
Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt).
### Build Status
Branch | GH Actions |
:-------------: | ---------- |
[`master`](https://github.com/mzimbres/aedis/tree/master) | [![CI](https://github.com/mzimbres/aedis/actions/workflows/ci.yml/badge.svg?branch=master)](https://github.com/mzimbres/aedis/actions/workflows/ci.yml)
### More information
* See the official github-pages for documentation: https://mzimbres.github.io/aedis

View File

@@ -823,7 +823,7 @@ WARN_LOGFILE =
# spaces. See also FILE_PATTERNS and EXTENSION_MAPPING
# Note: If this tag is empty the current directory is searched.
INPUT = include benchmarks/benchmarks.md examples
INPUT = include benchmarks/benchmarks.md . examples
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

View File

@@ -9,6 +9,7 @@
#include <boost/asio.hpp>
#include <aedis.hpp>
#include "unistd.h"
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
@@ -16,7 +17,6 @@
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
@@ -33,17 +33,23 @@ using response_type = std::vector<aedis::resp3::node<std::string>>;
net::awaitable<void> reader(std::shared_ptr<connection> db)
{
try {
request req;
req.push("SUBSCRIBE", "chat-channel");
co_await db->async_exec(req);
request req;
req.push("SUBSCRIBE", "chat-channel");
for (response_type resp;;) {
co_await db->async_receive(adapt(resp));
std::cout << "> " << resp.at(3).value;
resp.clear();
for (response_type resp;;) {
auto const ev = co_await db->async_receive_event(aedis::adapt(resp));
switch (ev) {
case connection::event::push:
print_push(resp);
break;
case connection::event::hello:
co_await db->async_exec(req);
break;
default:;
}
} catch (std::exception const&) {
resp.clear();
}
}
@@ -70,6 +76,7 @@ int main()
net::io_context ioc{1};
net::posix::stream_descriptor in{ioc, ::dup(STDIN_FILENO)};
auto db = std::make_shared<connection>(ioc);
db->get_config().enable_events = true;
co_spawn(ioc, run(in, db), net::detached);
co_spawn(ioc, reader(db), net::detached);
@@ -77,6 +84,8 @@ int main()
std::cout << ec.message() << std::endl;
});
net::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ ioc.stop(); });
ioc.run();
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;

View File

@@ -19,30 +19,26 @@ using aedis::resp3::request;
using aedis::experimental::exec;
using connection = aedis::connection<>;
// TODO: What is causing the delay?
int main()
{
try {
net::io_context ioc{1};
connection conn{ioc};
std::thread thread{[&]() { ioc.run(); }};
// Calls async_run in the correct executor.
net::dispatch(net::bind_executor(ioc, [&]() {
std::thread thread{[&]() {
conn.async_run(net::detached);
}));
ioc.run();
}};
request req;
req.push("PING");
req.push("QUIT");
// Executes commands synchronously.
std::tuple<aedis::ignore, std::string, aedis::ignore> resp;
std::tuple<std::string, aedis::ignore> resp;
exec(conn, req, adapt(resp));
thread.join();
std::cout << "Response: " << std::get<1>(resp) << std::endl;
std::cout << "Response: " << std::get<0>(resp) << std::endl;
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}

View File

@@ -25,21 +25,16 @@ using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>
using connection = aedis::connection<tcp_socket>;
using net::experimental::as_tuple;
/* In this example we send a subscription to a channel and start
* reading server side messages indefinitely.
/* This example will subscribe and read pushes indefinitely.
*
* After starting the example you can test it by sending messages with
* redis-cli like this
* To test send messages with redis-cli
*
* $ redis-cli -3
* 127.0.0.1:6379> PUBLISH channel1 some-message
* (integer) 3
* 127.0.0.1:6379>
*
* The messages will then appear on the terminal you are running the
* example.
*
* To test reconnection try for example to close all clients currently
* To test reconnection try, for example, to close all clients currently
* connected to the Redis instance
*
* $ redis-cli
@@ -52,7 +47,7 @@ net::awaitable<void> reader(std::shared_ptr<connection> db)
req.push("SUBSCRIBE", "channel");
for (std::vector<node_type> resp;;) {
auto [ec, ev, n] = co_await db->async_receive(aedis::adapt(resp), as_tuple(net::use_awaitable));
auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(resp), as_tuple(net::use_awaitable));
std::cout << "Event: " << aedis::to_string<tcp_socket>(ev) << std::endl;
@@ -60,9 +55,11 @@ net::awaitable<void> reader(std::shared_ptr<connection> db)
case connection::event::push:
print_push(resp);
break;
case connection::event::connect:
case connection::event::hello:
co_await db->async_exec(req);
break;
default:;
}

View File

@@ -9,17 +9,18 @@
#include <boost/asio.hpp>
#include <aedis.hpp>
#include <aedis/experimental/sync.hpp>
#include "print.hpp"
// Include this in no more than one .cpp file.
#include <aedis/src.hpp>
namespace net = boost::asio;
using aedis::adapt;
using aedis::resp3::request;
using aedis::experimental::exec;
using aedis::experimental::receive;
using aedis::experimental::receive_event;
using connection = aedis::connection<>;
using node = aedis::resp3::node<std::string>;
using aedis::resp3::node;
using event = connection::event;
// See subscriber.cpp for more info about how to run this example.
@@ -28,24 +29,30 @@ int main()
try {
net::io_context ioc{1};
connection conn{ioc};
conn.get_config().enable_events = true;
std::thread thread{[&]() {
request req;
req.push("HELLO", 3);
req.push("SUBSCRIBE", "channel");
conn.async_run(req, adapt(), net::detached);
conn.async_run(net::detached);
ioc.run();
}};
for (std::vector<node> resp;;) {
boost::system::error_code ec;
receive(conn, adapt(resp), ec);
request req;
req.push("SUBSCRIBE", "channel");
std::cout
<< "Event: " << resp.at(1).value << "\n"
<< "Channel: " << resp.at(2).value << "\n"
<< "Message: " << resp.at(3).value << "\n"
<< std::endl;
for (std::vector<node<std::string>> resp;;) {
boost::system::error_code ec;
auto const ev = receive_event(conn, aedis::adapt(resp), ec);
switch (ev) {
case connection::event::push:
print_push(resp);
break;
case connection::event::hello:
exec(conn, req);
break;
default:;
}
resp.clear();
}

View File

@@ -15,7 +15,7 @@
/** \mainpage Documentation
\tableofcontents
Useful links: \subpage any, [Benchmarks](benchmarks/benchmarks.md).
Useful links: \subpage any, [Changelog](CHANGELOG.md) and [Benchmarks](benchmarks/benchmarks.md).
\section Overview

View File

@@ -83,10 +83,17 @@ public:
bool enable_events = false;
};
/// Events communicated through \c async_receive_event.
enum class event {
/// The address has been successfully resolved.
resolve,
/// Connected to the Redis server.
connect,
/// Success sending AUTH and HELLO.
hello,
/// A push event has been received.
push,
/// Used internally.
invalid
};
@@ -141,7 +148,7 @@ public:
* connection::config::ping_interval.
*
* \li Starts reading from the socket and delivering events to the
* request started with \c async_exec and \c async_receive.
* request started with \c async_exec and \c async_receive_event.
*
* For an example see echo_server.cpp.
*
@@ -255,7 +262,7 @@ public:
template <
class Adapter = detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto async_receive(
auto async_receive_event(
Adapter adapter = adapt(),
CompletionToken token = CompletionToken{})
{
@@ -268,7 +275,7 @@ public:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, event, std::size_t)
, void(boost::system::error_code, event)
>(detail::receive_op<connection, decltype(f)>{this, f}, token, resv_);
}
@@ -294,7 +301,7 @@ public:
* safe to try a reconnect after that, when that happens, all
* pending request will be automatically sent.
*
* Calling this function will causes @c async_receive to return
* Calling this function will causes @c async_receive_event to return
* with @c boost::asio::experimental::channel_errc::channel_cancelled.
*
* \remarks
@@ -513,7 +520,9 @@ char const* to_string(typename connection<T>::event e)
switch (e) {
case event_type::resolve: return "resolve";
case event_type::connect: return "connect";
case event_type::hello: return "hello";
case event_type::push: return "push";
case event_type::invalid: return "invalid";
default: BOOST_ASSERT_MSG(false, "to_string: unhandled event.");
}
}

View File

@@ -94,7 +94,7 @@ struct receive_op {
{
yield conn->push_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec, Conn::event::invalid, 0);
self.complete(ec, Conn::event::invalid);
return;
}
@@ -103,7 +103,7 @@ struct receive_op {
yield resp3::async_read(*conn->socket_, conn->make_dynamic_buffer(), adapter, std::move(self));
if (ec) {
conn->cancel_run();
self.complete(ec, Conn::event::invalid, 0);
self.complete(ec, Conn::event::invalid);
return;
}
@@ -111,7 +111,7 @@ struct receive_op {
}
yield conn->push_channel_.async_send({}, 0, std::move(self));
self.complete(ec, conn->last_event_, read_size);
self.complete(ec, conn->last_event_);
return;
}
}
@@ -418,6 +418,15 @@ struct run_op {
return;
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::connect;
yield async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
self.complete(ec);
return;
}
}
conn->req_.clear();
if (!std::empty(conn->cfg_.username) && !std::empty(conn->cfg_.password))
conn->req_.push("AUTH", conn->cfg_.username, conn->cfg_.password);
@@ -442,7 +451,7 @@ struct run_op {
}
if (conn->cfg_.enable_events) {
conn->last_event_ = Conn::event::connect;
conn->last_event_ = Conn::event::hello;
yield async_send_receive(conn->push_channel_, std::move(self));
if (ec) {
self.complete(ec);

View File

@@ -7,6 +7,7 @@
#ifndef AEDIS_EXPERIMENTAL_SYNC_HPP
#define AEDIS_EXPERIMENTAL_SYNC_HPP
#include <aedis/adapt.hpp>
#include <aedis/connection.hpp>
#include <aedis/resp3/request.hpp>
@@ -73,9 +74,14 @@ exec(
* @throws std::system_error in case of error.
* @returns The number of bytes of the response.
*/
template <class Connection, class ResponseAdapter>
template <
class Connection,
class ResponseAdapter = aedis::detail::response_traits<void>::adapter_type>
std::size_t
exec(Connection& conn, resp3::request const& req, ResponseAdapter adapter)
exec(
Connection& conn,
resp3::request const& req,
ResponseAdapter adapter = aedis::adapt())
{
boost::system::error_code ec;
auto const res = exec(conn, req, adapter, ec);
@@ -95,23 +101,23 @@ exec(Connection& conn, resp3::request const& req, ResponseAdapter adapter)
* @returns The number of bytes of the response.
*/
template <class Connection, class ResponseAdapter>
std::size_t
receive(
auto receive_event(
Connection& conn,
ResponseAdapter adapter,
boost::system::error_code& ec)
{
using event_type = typename Connection::event;
std::mutex mutex;
std::condition_variable cv;
bool ready = false;
std::size_t res = 0;
event_type ev = event_type::invalid;
auto f = [&conn, &ec, &res, &mutex, &cv, &ready, adapter]()
auto f = [&conn, &ec, &ev, &mutex, &cv, &ready, adapter]()
{
conn.async_receive(adapter, [&cv, &mutex, &ready, &res, &ec](auto const& ecp, auto, std::size_t n) {
conn.async_receive_event(adapter, [&cv, &mutex, &ready, &ev, &ec](auto const& ecp, event_type evp) {
std::unique_lock ul(mutex);
ec = ecp;
res = n;
ev = evp;
ready = true;
ul.unlock();
cv.notify_one();
@@ -121,7 +127,7 @@ receive(
boost::asio::dispatch(boost::asio::bind_executor(conn.get_executor(), f));
std::unique_lock lk(mutex);
cv.wait(lk, [&ready]{return ready;});
return res;
return ev;
}
} // experimental

View File

@@ -203,13 +203,13 @@ net::awaitable<void>
push_consumer1(std::shared_ptr<connection> db, bool& received, char const* msg)
{
{
auto [ec, ev, n] = co_await db->async_receive(aedis::adapt(), as_tuple(net::use_awaitable));
auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable));
expect_no_error(ec, msg);
received = true;
}
{
auto [ec, ev, n] = co_await db->async_receive(aedis::adapt(), as_tuple(net::use_awaitable));
auto [ec, ev] = co_await db->async_receive_event(aedis::adapt(), as_tuple(net::use_awaitable));
expect_error(ec, boost::asio::experimental::channel_errc::channel_cancelled, msg);
}
}
@@ -322,7 +322,7 @@ net::awaitable<void>
push_consumer3(std::shared_ptr<connection> db)
{
for (;;)
co_await db->async_receive(aedis::adapt(), net::use_awaitable);
co_await db->async_receive_event(aedis::adapt(), net::use_awaitable);
}
// Test many subscribe requests.