2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Improvements with timeouts and simplifications.

This commit is contained in:
Marcelo Zimbres
2022-05-25 21:43:37 +02:00
parent 3dff0b78de
commit 4858c078f9
12 changed files with 189 additions and 147 deletions

View File

@@ -59,11 +59,11 @@ endif
nobase_include_HEADERS =\
$(top_srcdir)/aedis/src.hpp\
$(top_srcdir)/aedis/redis/command.hpp\
$(top_srcdir)/aedis/generic/client.hpp\
$(top_srcdir)/aedis/generic/connection.hpp\
$(top_srcdir)/aedis/generic/request.hpp\
$(top_srcdir)/aedis/generic/error.hpp\
$(top_srcdir)/aedis/generic/impl/error.ipp\
$(top_srcdir)/aedis/generic/detail/client_ops.hpp\
$(top_srcdir)/aedis/generic/detail/connection_ops.hpp\
$(top_srcdir)/aedis/sentinel/command.hpp\
$(top_srcdir)/aedis/aedis.hpp\
$(top_srcdir)/aedis/adapter/detail/adapters.hpp\

View File

@@ -14,8 +14,8 @@
#include <aedis/redis/command.hpp>
#include <aedis/sentinel/command.hpp>
#include <aedis/generic/error.hpp>
#include <aedis/generic/client.hpp>
#include <aedis/generic/request.hpp>
#include <aedis/generic/connection.hpp>
/** \mainpage Documentation
\tableofcontents

View File

@@ -4,8 +4,8 @@
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_GENERIC_CLIENT_HPP
#define AEDIS_GENERIC_CLIENT_HPP
#ifndef AEDIS_GENERIC_CONNECTION_HPP
#define AEDIS_GENERIC_CONNECTION_HPP
#include <vector>
#include <limits>
@@ -26,7 +26,7 @@
#include <aedis/redis/command.hpp>
#include <aedis/adapter/adapt.hpp>
#include <aedis/generic/request.hpp>
#include <aedis/generic/detail/client_ops.hpp>
#include <aedis/generic/detail/connection_ops.hpp>
// TODO: Don't pass pong to the adapter.
// TODO: Merge the requests payload.
@@ -34,7 +34,7 @@
namespace aedis {
namespace generic {
/** \brief A high level Redis client.
/** \brief A high level Redis connection.
* \ingroup any
*
* This class keeps a connection open to the Redis server where
@@ -43,8 +43,8 @@ namespace generic {
*
* https://redis.io/docs/reference/sentinel-clients
*/
template <class AsyncReadWriteStream, class Command>
class client {
template <class Command, class AsyncReadWriteStream = boost::asio::ip::tcp::socket>
class connection {
public:
/// Executor type.
using executor_type = typename AsyncReadWriteStream::executor_type;
@@ -93,7 +93,7 @@ public:
* \param ex The executor.
* \param cfg Configuration parameters.
*/
client(
connection(
boost::asio::any_io_executor ex,
adapter_type adapter,
config cfg = config{})
@@ -102,7 +102,7 @@ public:
, ping_timer_{ex}
, write_timer_{ex}
, wait_write_timer_{ex}
, idle_check_timer{ex}
, check_idle_timer_{ex}
, read_ch_{ex}
, push_ch_{ex}
, cfg_{cfg}
@@ -111,11 +111,11 @@ public:
{
}
client(
connection(
boost::asio::any_io_executor ex,
adapter_type2 a = adapter::adapt(),
config cfg = config{})
: client(
: connection(
ex,
[adapter = std::move(a)] (Command cmd, resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable { if (cmd != Command::ping) adapter(nd, ec); },
cfg)
@@ -129,29 +129,29 @@ public:
* This function performs the following steps
*
* @li Resolves the Redis host as of \c async_resolve with the
* timeout passed in client::config::resolve_timeout.
* timeout passed in connection::config::resolve_timeout.
*
* @li Connects to one of the endpoints returned by the resolve
* operation with the timeout passed in client::config::connect_timeout.
* operation with the timeout passed in connection::config::connect_timeout.
*
* @li Starts the \c async_read operation that keeps reading incoming
* responses. Each individual read uses the timeout passed on
* client::config::read_timeout. After each successful read it
* connection::config::read_timeout. After each successful read it
* will call the read or push callback.
*
* @li Starts the \c async_write operation that waits for new commands
* to be sent to Redis. Each individual write uses the timeout
* passed on client::config::write_timeout. After a successful
* passed on connection::config::write_timeout. After a successful
* write it will call the write callback.
*
* @li Starts the idle check operation with the timeout of twice
* the value of client::config::ping_delay_timeout. If no data is
* the value of connection::config::ping_delay_timeout. If no data is
* received during that time interval \c async_run completes with
* generic::error::idle_timeout.
*
* @li Starts the healthy check operation that sends
* redis::command::ping to Redis with a frequency equal to
* client::config::ping_delay_timeout.
* connection::config::ping_delay_timeout.
*
* In addition to the callbacks mentioned above, the read
* operations will call the resp3 callback as soon a new chunks of
@@ -160,13 +160,13 @@ public:
* It is safe to call \c async_run after it has returned. In this
* case, any outstanding commands will be sent after the
* connection is restablished. If a disconnect occurs while the
* response to a request has not been received, the client doesn't
* response to a request has not been received, the connection doesn't
* try to resend it to avoid resubmission.
*
* Example:
*
* @code
* awaitable<void> run_with_reconnect(std::shared_ptr<client_type> db)
* awaitable<void> run_with_reconnect(std::shared_ptr<connection_type> db)
* {
* auto ex = co_await this_coro::executor;
* asio::steady_timer timer{ex};
@@ -195,7 +195,7 @@ public:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::run_op<client, Command>{this}, token, read_timer_);
>(detail::run_op<connection, Command>{this}, token, read_timer_);
}
void add_request(request<Command>& req)
@@ -216,7 +216,7 @@ public:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<client>{this, &req}, token, read_timer_);
>(detail::exec_op<connection>{this, &req}, token, read_timer_);
}
/** @brief Receives events produced by the run operation.
@@ -269,7 +269,8 @@ private:
template <class T, class V> friend struct detail::ping_op;
template <class T, class V> friend struct detail::read_with_timeout_op;
template <class T, class V> friend struct detail::run_op;
template <class T, class V> friend struct detail::exec_op2;
template <class T, class V> friend struct detail::exec_internal_impl_op;
template <class T, class V> friend struct detail::exec_internal_op;
template <class T> friend struct detail::write_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::write_with_timeout_op;
@@ -325,7 +326,7 @@ private:
return can_write;
}
// Calls client::async_resolve with the resolve timeout passed in
// Calls connection::async_resolve with the resolve timeout passed in
// the config. Uses the write_timer_ to perform the timeout op.
template <class CompletionToken = default_completion_token_type>
auto
@@ -335,11 +336,11 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::resolve_with_timeout_op<client>{this},
>(detail::resolve_with_timeout_op<connection>{this},
token, resv_.get_executor());
}
// Calls client::async_connect with a timeout.
// Calls connection::async_connect with a timeout.
template <class CompletionToken = default_completion_token_type>
auto
async_connect_with_timeout(
@@ -348,7 +349,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::connect_with_timeout_op<client>{this}, token,
>(detail::connect_with_timeout_op<connection>{this}, token,
write_timer_.get_executor());
}
@@ -363,7 +364,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::read_with_timeout_op<client, Command>{this, cmd},
>(detail::read_with_timeout_op<connection, Command>{this, cmd},
token, read_timer_.get_executor());
}
@@ -375,7 +376,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::reader_op<client, Command>{this}, token, read_timer_.get_executor());
>(detail::reader_op<connection, Command>{this}, token, read_timer_.get_executor());
}
template <class CompletionToken = default_completion_token_type>
@@ -385,7 +386,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::write_op<client>{this}, token, write_timer_);
>(detail::write_op<connection>{this}, token, write_timer_);
}
// Write with a timeout.
@@ -397,7 +398,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::write_with_timeout_op<client>{this}, token, write_timer_);
>(detail::write_with_timeout_op<connection>{this}, token, write_timer_);
}
template <class CompletionToken = default_completion_token_type>
@@ -407,7 +408,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::writer_op<client>{this}, token, wait_write_timer_);
>(detail::writer_op<connection>{this}, token, wait_write_timer_);
}
template <class CompletionToken = default_completion_token_type>
@@ -417,7 +418,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::read_write_check_ping_op<client>{this}, token, read_timer_, write_timer_, wait_write_timer_, idle_check_timer);
>(detail::read_write_check_ping_op<connection>{this}, token, read_timer_, write_timer_, wait_write_timer_, check_idle_timer_);
}
template <class CompletionToken = default_completion_token_type>
@@ -427,7 +428,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::ping_op<client, Command>{this}, token, read_timer_);
>(detail::ping_op<connection, Command>{this}, token, read_timer_);
}
template <class CompletionToken = default_completion_token_type>
@@ -437,18 +438,30 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::idle_check_op<client>{this}, token, idle_check_timer);
>(detail::idle_check_op<connection>{this}, token, check_idle_timer_);
}
template <class CompletionToken = default_completion_token_type>
auto async_exec2(
auto async_exec_internal_impl(
request_type& req,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::exec_op2<client, request_type>{this, &req},
>(detail::exec_internal_impl_op<connection, request_type>{this, &req},
token, read_timer_);
}
template <class CompletionToken = default_completion_token_type>
auto async_exec_internal(
request_type& req,
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::exec_internal_op<connection, request_type>{this, &req},
token, read_timer_);
}
@@ -459,7 +472,7 @@ private:
boost::asio::steady_timer ping_timer_;
boost::asio::steady_timer write_timer_;
boost::asio::steady_timer wait_write_timer_;
boost::asio::steady_timer idle_check_timer;
boost::asio::steady_timer check_idle_timer_;
read_channel_type read_ch_;
read_channel_type push_ch_;
@@ -496,4 +509,4 @@ private:
} // generic
} // aedis
#endif // AEDIS_GENERIC_CLIENT_HPP
#endif // AEDIS_GENERIC_CONNECTION_HPP

View File

@@ -4,8 +4,8 @@
* accompanying file LICENSE.txt)
*/
#ifndef AEDIS_GENERIC_CLIENT_OPS_HPP
#define AEDIS_GENERIC_CLIENT_OPS_HPP
#ifndef AEDIS_GENERIC_CONNECTION_OPS_HPP
#define AEDIS_GENERIC_CONNECTION_OPS_HPP
#include <array>
@@ -29,10 +29,10 @@ namespace detail {
#include <boost/asio/yield.hpp>
template <class Client, class Request>
struct exec_op2 {
Client* cli;
typename Client::request_type* req;
template <class Conn, class Request>
struct exec_internal_impl_op {
Conn* cli;
typename Conn::request_type* req;
boost::asio::coroutine coro;
template <class Self>
@@ -53,7 +53,6 @@ struct exec_op2 {
return;
}
// Say hello and ignores the response.
yield
resp3::async_read(
*cli->socket_,
@@ -66,10 +65,60 @@ struct exec_op2 {
}
};
template <class Client>
template <class Conn, class Request>
struct exec_internal_op {
Conn* cli;
typename Conn::request_type* req;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
// Idle timeout.
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_delay_timeout);
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->async_exec_internal_impl(*req, token);},
[this](auto token) { return cli->check_idle_timer_.async_wait(token);}
).async_wait(
boost::asio::experimental::wait_for_one(),
std::move(self));
switch (order[0]) {
case 0:
{
if (ec1) {
self.complete(ec1);
return;
}
} break;
case 1:
{
if (!ec2) {
self.complete(error::idle_timeout);
return;
}
} break;
default: BOOST_ASSERT(false);
}
self.complete({});
}
}
};
template <class Conn>
struct exec_op {
Client* cli;
typename Client::request_type* req;
Conn* cli;
typename Conn::request_type* req;
std::size_t read_size = 0;
boost::asio::coroutine coro;
@@ -121,9 +170,9 @@ struct exec_op {
}
};
template <class Client, class Command>
template <class Conn, class Command>
struct ping_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -160,9 +209,9 @@ struct ping_op {
}
};
template <class Client>
template <class Conn>
struct idle_check_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -170,8 +219,8 @@ struct idle_check_op {
{
reenter (coro) for (;;)
{
cli->idle_check_timer.expires_after(2 * cli->cfg_.ping_delay_timeout);
yield cli->idle_check_timer.async_wait(std::move(self));
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_delay_timeout);
yield cli->check_idle_timer_.async_wait(std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -189,9 +238,9 @@ struct idle_check_op {
}
};
template <class Client>
template <class Conn>
struct resolve_with_timeout_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -227,7 +276,7 @@ struct resolve_with_timeout_op {
case 1:
{
if (!ec2) {
self.complete(generic::error::resolve_timeout);
self.complete(error::resolve_timeout);
return;
}
} break;
@@ -241,9 +290,9 @@ struct resolve_with_timeout_op {
}
};
template <class Client>
template <class Conn>
struct connect_with_timeout_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -279,7 +328,7 @@ struct connect_with_timeout_op {
case 1:
{
if (!ec2) {
self.complete(generic::error::connect_timeout);
self.complete(error::connect_timeout);
return;
}
} break;
@@ -292,9 +341,9 @@ struct connect_with_timeout_op {
}
};
template <class Client>
template <class Conn>
struct read_write_check_ping_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -347,9 +396,9 @@ struct read_write_check_ping_op {
}
};
template <class Client, class Command>
template <class Conn, class Command>
struct run_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -365,7 +414,7 @@ struct run_op {
cli->socket_ =
std::make_shared<
typename Client::next_layer_type
typename Conn::next_layer_type
>(cli->read_timer_.get_executor());
yield cli->async_connect_with_timeout(std::move(self));
@@ -376,7 +425,7 @@ struct run_op {
cli->req_.clear();
cli->req_.push(Command::hello, 3);
yield cli->async_exec2(cli->req_, std::move(self));
yield cli->async_exec_internal(cli->req_, std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -393,9 +442,9 @@ struct run_op {
}
};
template <class Client>
template <class Conn>
struct write_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -434,9 +483,9 @@ struct write_op {
}
};
template <class Client>
template <class Conn>
struct write_with_timeout_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -470,7 +519,7 @@ struct write_with_timeout_op {
case 1:
{
if (!ec2) {
self.complete(generic::error::write_timeout, 0);
self.complete(error::write_timeout, 0);
return;
}
} break;
@@ -483,9 +532,9 @@ struct write_with_timeout_op {
}
};
template <class Client>
template <class Conn>
struct writer_op {
Client* cli;
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
@@ -505,24 +554,22 @@ struct writer_op {
}
}
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->reqs_.front().req->empty());
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(n == cli->reqs_.front().req->size());
yield cli->wait_write_timer_.async_wait(std::move(self));
if (!cli->socket_->is_open()) {
self.complete(error::write_stop_requested);
return;
}
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->reqs_.front().req->empty());
}
}
};
template <class Client, class Command>
template <class Conn, class Command>
struct read_with_timeout_op {
Client* cli;
Conn* cli;
Command cmd;
boost::asio::coroutine coro;
@@ -557,7 +604,7 @@ struct read_with_timeout_op {
case 1:
{
if (!ec2) {
self.complete(generic::error::read_timeout, 0);
self.complete(error::read_timeout, 0);
return;
}
} break;
@@ -570,9 +617,9 @@ struct read_with_timeout_op {
}
};
template <class Client, class Command>
template <class Conn, class Command>
struct reader_op {
Client* cli;
Conn* cli;
resp3::type type_ = resp3::type::invalid;
Command cmd_ = Command::invalid;
boost::asio::coroutine coro;
@@ -654,4 +701,4 @@ struct reader_op {
} // generic
} // aedis
#endif // AEDIS_GENERIC_CLIENT_OPS_HPP
#endif // AEDIS_GENERIC_CONNECTION_OPS_HPP

View File

@@ -18,7 +18,7 @@ using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::redis::command;
using aedis::generic::request;
using client_type = aedis::generic::client<net::ip::tcp::socket, command>;
using connection = aedis::generic::connection<command>;
using response_type = std::vector<aedis::resp3::node<std::string>>;
class user_session:
@@ -30,7 +30,7 @@ public:
{ timer_.expires_at(std::chrono::steady_clock::time_point::max()); }
void
start(std::shared_ptr<client_type> db,
start(std::shared_ptr<connection> db,
std::shared_ptr<response_type> resp)
{
co_spawn(socket_.get_executor(),
@@ -51,7 +51,7 @@ public:
private:
net::awaitable<void>
reader(
std::shared_ptr<client_type> db,
std::shared_ptr<connection> db,
std::shared_ptr<response_type> resp)
{
try {
@@ -102,7 +102,7 @@ using sessions_type = std::vector<std::shared_ptr<user_session>>;
net::awaitable<void>
push_reader(
std::shared_ptr<client_type> db,
std::shared_ptr<connection> db,
std::shared_ptr<response_type> resp,
std::shared_ptr<sessions_type> sessions)
{
@@ -119,7 +119,7 @@ push_reader(
net::awaitable<void>
listener(
std::shared_ptr<net::ip::tcp::acceptor> acc,
std::shared_ptr<client_type> db,
std::shared_ptr<connection> db,
std::shared_ptr<sessions_type> sessions,
std::shared_ptr<response_type> resp)
{
@@ -137,13 +137,12 @@ int main()
net::io_context ioc{1};
// Redis client and receiver.
auto db = std::make_shared<client_type>(ioc.get_executor());
auto db = std::make_shared<connection>(ioc.get_executor());
db->async_run([](auto ec){ std::cout << ec.message() << std::endl;});
// Sends hello and subscribes to the channel. Ignores the
// response.
request<command> req;
req.push(command::hello, 3);
req.push(command::subscribe, "channel");
db->async_exec(req, [](auto, auto){});

View File

@@ -18,29 +18,31 @@ using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::generic::request;
using aedis::redis::command;
using client_type = aedis::generic::client<net::ip::tcp::socket, command>;
using response_type = std::vector<aedis::resp3::node<std::string>>;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::acceptor>;
using connection = aedis::generic::connection<command>;
net::awaitable<void>
echo_loop(
tcp_socket socket,
std::shared_ptr<client_type> db,
std::shared_ptr<connection> db,
std::shared_ptr<response_type> resp)
{
try {
for (std::string msg;;) {
auto const n = co_await net::async_read_until(socket, net::dynamic_buffer(msg, 1024), "\n", net::use_awaitable);
std::string msg;
for (auto dbuffer = net::dynamic_buffer(msg, 1024);;) {
auto const n = co_await net::async_read_until(socket, dbuffer, "\n");
request<command> req;
req.push(command::incr, "echo-server-counter");
req.push(command::set, "echo-server-key", msg);
req.push(command::get, "echo-server-key");
req.push(command::incr, "echo-server-counter");
co_await db->async_exec(req, net::use_awaitable);
co_await net::async_write(socket, net::buffer(resp->at(1).value), net::use_awaitable);
std::cout << "Echos so far: " << resp->at(2) << std::endl;
resp->at(0).value += ": ";
resp->at(0).value += resp->at(2).value;
co_await net::async_write(socket, net::buffer(resp->at(0).value));
resp->clear();
msg.erase(0, n);
dbuffer.consume(n);
}
} catch (std::exception const& e) {
std::cout << "Error: " << e.what() << std::endl;
@@ -50,7 +52,7 @@ echo_loop(
net::awaitable<void>
listener(
std::shared_ptr<tcp_acceptor> acc,
std::shared_ptr<client_type> db,
std::shared_ptr<connection> db,
std::shared_ptr<response_type> resp)
{
auto ex = co_await net::this_coro::executor;
@@ -68,14 +70,9 @@ int main()
// Redis client.
auto resp = std::make_shared<response_type>();
auto db = std::make_shared<client_type>(ioc.get_executor(), adapt(*resp));
auto db = std::make_shared<connection>(ioc.get_executor(), adapt(*resp));
db->async_run([](auto ec){ std::cout << ec.message() << std::endl;});
// Sends hello and ignores the response.
request<command> req;
req.push(command::hello, 3);
db->async_exec(req, [](auto, auto){});
// TCP acceptor.
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<tcp_acceptor>(ioc.get_executor(), endpoint);

View File

@@ -17,7 +17,7 @@ using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::redis::command;
using aedis::generic::request;
using client_type = aedis::generic::client<net::ip::tcp::socket, command>;
using connection = aedis::generic::connection<command>;
auto run_handler =[](auto ec)
{
@@ -34,23 +34,13 @@ int main()
std::vector<node<std::string>> resp;
net::io_context ioc;
connection db{ioc.get_executor(), adapt(resp)};
client_type db{ioc.get_executor()};
db.set_adapter(adapt(resp));
request<command> req2;
req2.push(command::set, "intro-key", "message1");
req2.push(command::get, "intro-key");
db.async_exec(req2, exec_handler);
request<command> req3;
req3.push(command::set, "intro-key", "message2");
req3.push(command::get, "intro-key");
db.async_exec(req3, exec_handler);
request<command> req4;
req4.push(command::quit);
db.async_exec(req4, exec_handler);
request<command> req;
req.push(command::set, "intro-key", "message1");
req.push(command::get, "intro-key");
req.push(command::quit);
db.async_exec(req, exec_handler);
db.async_run(run_handler);
ioc.run();

View File

@@ -20,7 +20,6 @@ using aedis::resp3::node;
using aedis::adapter::adapt;
using aedis::adapter::adapter_t;
using aedis::redis::command;
using aedis::generic::client;
void print_and_clear_aggregate(std::vector<aedis::resp3::node<std::string>>& v)
{

View File

@@ -20,7 +20,7 @@ using aedis::adapter::adapt;
using aedis::adapter::adapter_t;
using aedis::redis::command;
using aedis::generic::request;
using client_type = aedis::generic::client<net::ip::tcp::socket, command>;
using connection = aedis::generic::connection<command>;
using node_type = aedis::resp3::node<boost::string_view>;
using error_code = boost::system::error_code;
@@ -84,7 +84,7 @@ int main()
T2 resp2;
net::io_context ioc;
client_type db{ioc.get_executor(), adapter{resp0, resp1, resp2}};
connection db{ioc.get_executor(), adapter{resp0, resp1, resp2}};
request<command> req;
req.push_range(command::rpush, "rpush-key", vec);

View File

@@ -15,7 +15,7 @@ namespace net = boost::asio;
using aedis::adapter::adapt;
using aedis::redis::command;
using aedis::generic::request;
using client_type = aedis::generic::client<net::ip::tcp::socket, command>;
using connection = aedis::generic::connection<command>;
using response_type = std::vector<aedis::resp3::node<std::string>>;
/* In this example we send a subscription to a channel and start
@@ -34,7 +34,7 @@ using response_type = std::vector<aedis::resp3::node<std::string>>;
*/
net::awaitable<void>
push_reader(std::shared_ptr<client_type> db, response_type& resp)
push_reader(std::shared_ptr<connection> db, response_type& resp)
{
for (;;) {
auto n = co_await db->async_read_push(net::use_awaitable);
@@ -60,12 +60,11 @@ int main()
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
auto db = std::make_shared<connection>(ioc.get_executor());
db->set_adapter(adapt(resp));
net::co_spawn(ioc.get_executor(), push_reader(db, resp), net::detached);
request<command> req;
req.push(command::hello, 3);
req.push(command::subscribe, "channel1", "channel2");
db->async_exec(req, [&](auto, auto){req.clear();});

View File

@@ -10,7 +10,6 @@
#include <boost/asio.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
@@ -27,9 +26,8 @@ using aedis::adapter::adapter_t;
using aedis::redis::command;
using aedis::resp3::node;
using aedis::generic::request;
using client_type = aedis::generic::client<net::ip::tcp::socket, command>;
using connection = aedis::generic::connection<command>;
using error_code = boost::system::error_code;
using namespace net::experimental::awaitable_operators;
using net::experimental::as_tuple;
using node_type = aedis::resp3::node<boost::string_view>;
using tcp = net::ip::tcp;
@@ -50,9 +48,9 @@ void test_resolve_error()
};
net::io_context ioc;
client_type::config cfg;
connection::config cfg;
cfg.host = "Atibaia";
client_type db(ioc.get_executor(), adapt(), cfg);
connection db(ioc.get_executor(), adapt(), cfg);
db.async_run(f);
ioc.run();
}
@@ -67,9 +65,9 @@ void test_connect_error()
};
net::io_context ioc;
client_type::config cfg;
connection::config cfg;
cfg.port = "1";
client_type db(ioc.get_executor(), adapt(), cfg);
connection db(ioc.get_executor(), adapt(), cfg);
db.async_run(f);
ioc.run();
}
@@ -80,7 +78,7 @@ void test_connect_error()
void test_quit()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
auto db = std::make_shared<connection>(ioc.get_executor());
request<command> req;
req.push(command::hello, 3);
@@ -101,7 +99,7 @@ void test_quit()
//----------------------------------------------------------------
net::awaitable<void>
push_consumer3(std::shared_ptr<client_type> db)
push_consumer3(std::shared_ptr<connection> db)
{
{
auto [ec, n] = co_await db->async_read_push(as_tuple(net::use_awaitable));
@@ -117,7 +115,7 @@ push_consumer3(std::shared_ptr<client_type> db)
void test_push()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
auto db = std::make_shared<connection>(ioc.get_executor());
request<command> req;
req.push(command::hello, 3);
@@ -141,7 +139,7 @@ void test_push()
////----------------------------------------------------------------
net::awaitable<void> run5(std::shared_ptr<client_type> db)
net::awaitable<void> run5(std::shared_ptr<connection> db)
{
{
request<command> req;
@@ -172,7 +170,7 @@ net::awaitable<void> run5(std::shared_ptr<client_type> db)
void test_reconnect()
{
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor());
auto db = std::make_shared<connection>(ioc.get_executor());
net::co_spawn(ioc.get_executor(), run5(db), net::detached);
ioc.run();
@@ -180,7 +178,7 @@ void test_reconnect()
void test_idle()
{
client_type::config cfg;
connection::config cfg;
cfg.resolve_timeout = std::chrono::seconds{1};
cfg.connect_timeout = std::chrono::seconds{1};
cfg.read_timeout = std::chrono::seconds{1};
@@ -188,7 +186,7 @@ void test_idle()
cfg.ping_delay_timeout = std::chrono::seconds{1};
net::io_context ioc;
auto db = std::make_shared<client_type>(ioc.get_executor(), adapt(), cfg);
auto db = std::make_shared<connection>(ioc.get_executor(), adapt(), cfg);
request<command> req;
req.push(command::hello, 3);

View File

@@ -47,7 +47,7 @@ int main()
auto ep = *std::begin(res);
int n = 100;
for (int i = 0; i < 2; ++i)
for (int i = 0; i < 100; ++i)
net::co_spawn(ioc, example(ep, "Some message\n", n), net::detached);
ioc.run();