2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Pass the adapter directly to async_exec.

This commit is contained in:
Marcelo Zimbres
2022-05-28 21:19:00 +02:00
parent 485bdc316b
commit ebef2f9e23
9 changed files with 207 additions and 173 deletions

View File

@@ -17,6 +17,7 @@
#include <chrono>
#include <queue>
#include <memory>
#include <type_traits>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
@@ -44,16 +45,14 @@ namespace generic {
*/
template <class Command, class AsyncReadWriteStream = boost::asio::ip::tcp::socket>
class connection {
private:
using adapter_type = std::function<void(Command, resp3::node<boost::string_view> const&, boost::system::error_code&)>;
using adapter_type2 = std::function<void(resp3::node<boost::string_view> const&, boost::system::error_code&)>;
public:
/// Executor type.
using executor_type = typename AsyncReadWriteStream::executor_type;
/// Callback type of resp3 operations.
using adapter_type = std::function<void(Command, resp3::node<boost::string_view> const&, boost::system::error_code&)>;
/// resp3 callback type (version without command).
using adapter_type2 = std::function<void(resp3::node<boost::string_view> const&, boost::system::error_code&)>;
/// Type of the last layer
using next_layer_type = AsyncReadWriteStream;
@@ -84,7 +83,7 @@ public:
std::chrono::milliseconds write_timeout = std::chrono::seconds{5};
/// Time after which a ping is sent if no data is received.
std::chrono::milliseconds ping_delay_timeout = std::chrono::seconds{5};
std::chrono::milliseconds ping_interval = std::chrono::seconds{5};
/// The maximum size allwed in a read operation.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
@@ -95,10 +94,7 @@ public:
* \param ex The executor.
* \param cfg Configuration parameters.
*/
connection(
boost::asio::any_io_executor ex,
adapter_type adapter,
config cfg = config{})
connection(boost::asio::any_io_executor ex, config cfg = config{})
: resv_{ex}
, wait_write_timer_{ex}
, ping_timer_{ex}
@@ -107,19 +103,15 @@ public:
, wait_push_timer_{ex}
, check_idle_timer_{ex}
, cfg_{cfg}
, adapter_{adapter}
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
{ }
{
wait_push_timer_.expires_at(std::chrono::steady_clock::time_point::max());
wait_write_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
connection(
boost::asio::any_io_executor ex,
adapter_type2 a = adapter::adapt(),
config cfg = config{})
: connection(
ex,
[adapter = std::move(a)] (Command cmd, resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable { if (cmd != Command::ping) adapter(nd, ec); },
cfg)
{}
connection(boost::asio::io_context& ioc, config cfg = config{})
: connection(ioc.get_executor(), cfg)
{ }
/// Returns the executor.
auto get_executor() {return resv_.get_executor();}
@@ -145,13 +137,13 @@ public:
* write it will call the write callback.
*
* @li Starts the idle check operation with the timeout of twice
* the value of connection::config::ping_delay_timeout. If no data is
* the value of connection::config::ping_interval. If no data is
* received during that time interval \c async_run completes with
* generic::error::idle_timeout.
*
* @li Starts the healthy check operation that sends
* redis::command::ping to Redis with a frequency equal to
* connection::config::ping_delay_timeout.
* connection::config::ping_interval.
*
* In addition to the callbacks mentioned above, the read
* operations will call the resp3 callback as soon a new chunks of
@@ -200,42 +192,53 @@ public:
/** @brief Asynchrnously schedules a command for execution.
*/
template <class CompletionToken = default_completion_token_type>
template <
class Adapter = adapter::detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type,
std::enable_if_t<std::is_convertible<Adapter, adapter_type>::value, bool> = true>
auto async_exec(
request_type const& req,
Adapter adapter = adapter::adapt(),
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<connection>{this, &req}, token, resv_);
>(detail::exec_op<connection, Adapter>{this, &req, adapter}, token, resv_);
}
template <
class Adapter = adapter::detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type,
std::enable_if_t<std::is_convertible<Adapter, adapter_type2>::value, bool> = true>
auto async_exec(
request_type const& req,
Adapter adapter = adapter::adapt(),
CompletionToken token = CompletionToken{})
{
auto wrap = [adapter]
(Command, resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable
{ adapter(nd, ec); };
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::exec_op<connection, decltype(wrap)>{this, &req, wrap}, token, resv_);
}
/** @brief Receives events produced by the run operation.
*/
template <class CompletionToken = default_completion_token_type>
auto async_read_push(CompletionToken token = CompletionToken{})
template <
class Adapter = adapter::detail::response_traits<void>::adapter_type,
class CompletionToken = default_completion_token_type>
auto
async_read_push(
Adapter adapter = adapter::adapt(),
CompletionToken token = CompletionToken{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code, std::size_t)
>(detail::read_push_op<connection, Command>{this}, token, resv_);
}
/// Set the response adapter.
void set_adapter(adapter_type adapter)
{ adapter_ = std::move(adapter); }
/// Set the response adapter.
void set_adapter(adapter_type2 a)
{
adapter_ =
[adapter = std::move(a)]
(Command cmd, resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable
{
if (cmd != Command::ping)
adapter(nd, ec);
};
>(detail::read_push_op<connection, Command, Adapter>{this, adapter}, token, resv_);
}
/** @brief Closes the connection with the database.
@@ -244,6 +247,7 @@ public:
*/
void close()
{
//std::cout << "close: closing." << std::endl;
socket_->close();
wait_read_timer_.expires_at(std::chrono::steady_clock::now());
wait_push_timer_.expires_at(std::chrono::steady_clock::now());
@@ -259,10 +263,11 @@ public:
private:
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
template <class T, class V> friend struct detail::reader_op;
template <class T, class V> friend struct detail::ping_op;
template <class T, class V> friend struct detail::run_op;
template <class T, class V> friend struct detail::read_push_op;
template <class T, class U, class V> friend struct detail::read_push_op;
template <class T, class U> friend struct detail::reader_op;
template <class T, class U> friend struct detail::ping_op;
template <class T, class U> friend struct detail::run_op;
template <class T, class u> friend struct detail::exec_op;
template <class T> friend struct detail::exec_internal_impl_op;
template <class T> friend struct detail::exec_internal_op;
template <class T> friend struct detail::write_op;
@@ -272,7 +277,6 @@ private:
template <class T> friend struct detail::resolve_with_timeout_op;
template <class T> friend struct detail::check_idle_op;
template <class T> friend struct detail::read_write_check_ping_op;
template <class T> friend struct detail::exec_op;
void
add_request(
@@ -283,10 +287,12 @@ private:
reqs_.push_back({timer, req.commands().size()});
n_cmds_next_ += req.commands().size();
payload_next_ += req.payload();
BOOST_ASSERT(!payload_next_.empty());
for (auto cmd : req.commands())
cmds_.push(cmd.first);
if (can_write) {
BOOST_ASSERT(n_cmds_ == 0);
//std::cout << "add_request: Requesting a write2." << std::endl;
wait_write_timer_.cancel_one();
}
}
@@ -296,16 +302,6 @@ private:
return boost::asio::dynamic_buffer(read_buffer_, cfg_.max_read_size);
}
auto select_adapter(Command cmd)
{
return [this, cmd]
(resp3::node<boost::string_view> const& nd,
boost::system::error_code& ec) mutable
{
adapter_(cmd, nd, ec);
};
}
// Calls connection::async_resolve with the resolve timeout passed in
// the config. Uses the write_timer_ to perform the timeout op.
template <class CompletionToken = default_completion_token_type>
@@ -373,7 +369,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::writer_op<connection>{this}, token, wait_write_timer_);
>(detail::writer_op<connection>{this}, token, resv_);
}
template <class CompletionToken = default_completion_token_type>
@@ -443,10 +439,6 @@ private:
// Configuration parameters.
config cfg_;
// Called by the parser after each new chunk of resp3 data is
// processed.
adapter_type adapter_;
// Buffer used by the read operations.
std::string read_buffer_;
@@ -455,6 +447,7 @@ private:
std::size_t n_cmds = 0;
};
std::size_t waiting_pushes_ = 0;
std::size_t n_cmds_ = 0;
std::size_t n_cmds_next_ = 0;
std::string payload_;

View File

@@ -22,6 +22,7 @@
#include <aedis/resp3/write.hpp>
#include <aedis/generic/error.hpp>
#include <aedis/redis/command.hpp>
#include <aedis/adapter/adapt.hpp>
namespace aedis {
namespace generic {
@@ -80,7 +81,7 @@ struct exec_internal_op {
reenter (coro)
{
// Idle timeout.
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_delay_timeout);
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_interval);
yield
boost::asio::experimental::make_parallel_group(
@@ -115,9 +116,10 @@ struct exec_internal_op {
}
};
template <class Conn, class Command>
template <class Conn, class Command, class Adapter>
struct read_push_op {
Conn* cli;
Adapter adapter;
boost::asio::coroutine coro;
template <class Self>
@@ -128,31 +130,41 @@ struct read_push_op {
{
reenter (coro)
{
cli->wait_push_timer_.expires_at(std::chrono::steady_clock::time_point::max());
yield cli->wait_push_timer_.async_wait(std::move(self));
if (!cli->socket_->is_open()) {
self.complete(ec, 0);
return;
//std::cout << "push_op: waiting to process push." << std::endl;
if (cli->waiting_pushes_ == 0) {
yield cli->wait_push_timer_.async_wait(std::move(self));
if (!cli->socket_->is_open()) {
self.complete(ec, 0);
return;
}
//std::cout << "push_op: After wait." << std::endl;
BOOST_ASSERT(cli->waiting_pushes_ == 1);
}
//std::cout << "push_op: starting to process the push." << std::endl;
yield
resp3::async_read(
*cli->socket_,
cli->make_dynamic_buffer(),
cli->select_adapter(Command::invalid),
adapter,
std::move(self));
//std::cout << "push_op: finish, calling the reader_op." << std::endl;
cli->wait_read_timer_.cancel_one();
cli->waiting_pushes_ = 0;
self.complete(ec, n);
return;
}
}
};
template <class Conn>
template <class Conn, class Adapter>
struct exec_op {
Conn* cli;
typename Conn::request_type const* req;
Adapter adapter;
std::shared_ptr<boost::asio::steady_timer> timer;
std::size_t read_size = 0;
boost::asio::coroutine coro;
@@ -171,6 +183,7 @@ struct exec_op {
timer->expires_at(std::chrono::steady_clock::time_point::max());
cli->add_request(*req, timer);
//std::cout << "exec_op: waiting to process a read " << cli->reqs_.size() << std::endl;
// Notice we use the back of the queue.
yield timer->async_wait(std::move(self));
if (!cli->socket_->is_open()) {
@@ -178,16 +191,31 @@ struct exec_op {
return;
}
// Notice we use the front of the queue.
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(cli->reqs_.front().n_cmds != 0);
if (cli->reqs_.front().n_cmds == 0) {
// Some requests don't have response, so we have to exit
// the operation earlier.
cli->reqs_.pop_front(); // TODO: Recycle timers.
// If there is no ongoing push-read operation we can
// request the timer to proceed, otherwise we can just
// exit.
if (cli->waiting_pushes_ == 0) {
//std::cout << "exec_op: requesting read op to proceed." << std::endl;
cli->wait_read_timer_.cancel_one();
}
self.complete({}, 0);
return;
}
// Notice we use the front of the queue.
while (cli->reqs_.front().n_cmds != 0) {
BOOST_ASSERT(!cli->cmds_.empty());
yield
resp3::async_read(
*cli->socket_,
cli->make_dynamic_buffer(),
cli->select_adapter(cli->cmds_.front()),
[adpt = adapter, cmd = cli->cmds_.front()] (resp3::node<boost::string_view> const& nd, boost::system::error_code& ec) mutable { adpt(cmd, nd, ec); },
std::move(self));
if (ec) {
cli->close();
@@ -214,9 +242,12 @@ struct exec_op {
// We are done with the pipeline and can resumes listening
// on the socket and send pending requests if there is
// any.
//std::cout << "exec_op: requesting read op to proceed." << std::endl;
cli->wait_read_timer_.cancel_one();
if (!cli->reqs_.empty())
if (!cli->reqs_.empty()) {
//std::cout << "exec_op: Requesting a write." << std::endl;
cli->wait_write_timer_.cancel_one();
}
} else {
// We are not done with the pipeline and can continue
// reading.
@@ -243,22 +274,18 @@ struct ping_op {
{
reenter (coro) for (;;)
{
cli->ping_timer_.expires_after(cli->cfg_.ping_delay_timeout);
//std::cout << "ping_op: waiting to send a ping." << std::endl;
cli->ping_timer_.expires_after(cli->cfg_.ping_interval);
yield cli->ping_timer_.async_wait(std::move(self));
if (ec) {
// The timer has been canceled, continue.
self.complete(ec);
return;
}
// The timer fired, send the ping. If there is an ongoing
// command there is no need to send a new one.
if (!cli->reqs_.empty())
continue;
//std::cout << "ping_op: Sending a command." << std::endl;
cli->req_.clear();
cli->req_.push(Command::ping);
yield cli->async_exec(cli->req_, std::move(self));
yield cli->async_exec(cli->req_, adapter::adapt(), std::move(self));
if (ec) {
self.complete(ec);
return;
@@ -277,7 +304,7 @@ struct check_idle_op {
{
reenter (coro) for (;;)
{
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_delay_timeout);
cli->check_idle_timer_.expires_after(2 * cli->cfg_.ping_interval);
yield cli->check_idle_timer_.async_wait(std::move(self));
if (ec) {
self.complete(ec);
@@ -285,7 +312,7 @@ struct check_idle_op {
}
auto const now = std::chrono::steady_clock::now();
if (cli->last_data_ + (2 * cli->cfg_.ping_delay_timeout) < now) {
if (cli->last_data_ + (2 * cli->cfg_.ping_interval) < now) {
cli->close();
self.complete(error::idle_timeout);
return;
@@ -415,7 +442,7 @@ struct read_write_check_ping_op {
reenter (coro)
{
// Starts the reader and writer ops.
cli->wait_write_timer_.expires_at(std::chrono::steady_clock::time_point::max());
//std::cout << "read_write_check_ping_op: Setting the timer." << std::endl;
yield
boost::asio::experimental::make_parallel_group(
@@ -512,6 +539,7 @@ struct write_op {
{
reenter (coro)
{
//std::cout << "write_op: before." << std::endl;
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->payload_next_.empty());
@@ -526,6 +554,19 @@ struct write_op {
*cli->socket_,
boost::asio::buffer(cli->payload_),
std::move(self));
//std::cout << "write_op: after." << std::endl;
BOOST_ASSERT(!cli->reqs_.empty());
if (cli->reqs_.front().n_cmds == 0) {
// Some requests don't have response, so their timers
// won't be canceled on read op, we have to do it here.
cli->reqs_.front().timer->cancel_one();
// Notice we don't have to call
// cli->wait_read_timer_.cancel_one(); as that operation
// is ongoing.
self.complete({}, n);
return;
}
cli->payload_.clear();
self.complete(ec, n);
@@ -557,6 +598,8 @@ struct write_with_timeout_op {
boost::asio::experimental::wait_for_one(),
std::move(self));
//std::cout << "write_with_timeout_op: completed." << std::endl;
switch (order[0]) {
case 0:
{
@@ -599,7 +642,6 @@ struct writer_op {
// The timer must be however canceled so we can unblock the
// channel.
//if (cli->n_cmds_ == 0 && !cli->reqs_.empty()) {
if (!cli->reqs_.empty()) {
yield cli->async_write_with_timeout(std::move(self));
if (ec) {
@@ -609,12 +651,14 @@ struct writer_op {
}
}
//std::cout << "writer_op: waiting to write." << std::endl;
yield cli->wait_write_timer_.async_wait(std::move(self));
if (!cli->socket_->is_open()) {
self.complete(error::write_stop_requested);
return;
}
//std::cout << "writer_op: Write requested: " << ec.message() << std::endl;
}
}
};
@@ -633,6 +677,7 @@ struct reader_op {
reenter (coro) for (;;)
{
//std::cout << "reader_op: waiting data." << std::endl;
yield
boost::asio::async_read_until(
*cli->socket_,
@@ -653,19 +698,26 @@ struct reader_op {
cli->last_data_ = std::chrono::steady_clock::now();
if (resp3::to_type(cli->read_buffer_.front()) == resp3::type::push) {
//std::cout << "reader_op: Requesting the push op." << std::endl;
cli->waiting_pushes_ = 1;
cli->wait_push_timer_.cancel_one();
} else {
//std::cout << "reader_op: Requesting the read op." << std::endl;
BOOST_ASSERT(!cli->cmds_.empty());
BOOST_ASSERT(cli->reqs_.front().n_cmds != 0);
cli->reqs_.front().timer->cancel_one();
}
//std::cout << "reader_op: waiting to read." << std::endl;
cli->wait_read_timer_.expires_after(cli->cfg_.read_timeout);
yield cli->wait_read_timer_.async_wait(std::move(self));
//std::cout << "reader_op: after wait: " << ec.message() << std::endl;
if (!ec) {
//std::cout << "reader_op: error1." << std::endl;
self.complete(error::read_timeout);
return;
}
if (!cli->socket_->is_open()) {
self.complete(ec);
return;

View File

@@ -60,7 +60,7 @@ private:
request<command> req;
req.push(command::publish, "channel", msg);
req.push(command::incr, "chat-room-counter");
co_await db->async_exec(req, net::use_awaitable);
co_await db->async_exec(req, adapt(*resp), net::use_awaitable);
std::cout << "Messsages so far: " << resp->at(1).value << std::endl;
resp->clear();
msg.erase(0, n);
@@ -101,13 +101,13 @@ private:
using sessions_type = std::vector<std::shared_ptr<user_session>>;
net::awaitable<void>
push_reader(
reader(
std::shared_ptr<connection> db,
std::shared_ptr<response_type> resp,
std::shared_ptr<sessions_type> sessions)
{
for (;;) {
co_await db->async_read_push(net::use_awaitable);
co_await db->async_read_push(adapt(*resp), net::use_awaitable);
for (auto& session: *sessions)
session->deliver(resp->at(3).value);
@@ -131,26 +131,28 @@ listener(
}
}
auto handler =[](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
try {
net::io_context ioc{1};
// Redis client and receiver.
auto db = std::make_shared<connection>(ioc.get_executor());
db->async_run([](auto ec){ std::cout << ec.message() << std::endl;});
auto db = std::make_shared<connection>(ioc);
db->async_run(handler);
// Sends hello and subscribes to the channel. Ignores the
// response.
request<command> req;
req.push(command::subscribe, "channel");
db->async_exec(req, [](auto, auto){});
db->async_exec(req, adapt(), handler);
auto resp = std::make_shared<response_type>();
db->set_adapter(adapt(*resp));
auto sessions = std::make_shared<sessions_type>();
net::co_spawn(ioc.get_executor(), push_reader(db, resp, sessions), net::detached);
net::co_spawn(ioc, reader(db, resp, sessions), net::detached);
// TCP acceptor.
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
@@ -160,8 +162,8 @@ int main()
// Signal handler.
net::signal_set signals(ioc.get_executor(), SIGINT, SIGTERM);
signals.async_wait([acc, db] (auto, int) {
acc->cancel();
db->close();
acc->cancel();
db->close();
});
ioc.run();

View File

@@ -37,7 +37,7 @@ echo_loop(
req.push(command::incr, "echo-server-counter");
req.push(command::set, "echo-server-key", msg);
req.push(command::get, "echo-server-key");
co_await db->async_exec(req, net::use_awaitable);
co_await db->async_exec(req, adapt(*resp), net::use_awaitable);
resp->at(0).value += ": ";
resp->at(0).value += resp->at(2).value;
co_await net::async_write(socket, net::buffer(resp->at(0).value));
@@ -52,10 +52,10 @@ echo_loop(
net::awaitable<void>
listener(
std::shared_ptr<tcp_acceptor> acc,
std::shared_ptr<connection> db,
std::shared_ptr<response_type> resp)
std::shared_ptr<connection> db)
{
auto ex = co_await net::this_coro::executor;
auto resp = std::make_shared<response_type>();
for (;;) {
auto socket = co_await acc->async_accept();
@@ -63,20 +63,22 @@ listener(
}
}
auto handler =[](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
try {
net::io_context ioc;
// Redis client.
auto resp = std::make_shared<response_type>();
auto db = std::make_shared<connection>(ioc.get_executor(), adapt(*resp));
db->async_run([](auto ec){ std::cout << ec.message() << std::endl;});
auto db = std::make_shared<connection>(ioc);
db->async_run(handler);
// TCP acceptor.
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<tcp_acceptor>(ioc.get_executor(), endpoint);
co_spawn(ioc, listener(acc, db, resp), net::detached);
auto acc = std::make_shared<tcp_acceptor>(ioc, endpoint);
co_spawn(ioc, listener(acc, db), net::detached);
ioc.run();
} catch (std::exception const& e) {

View File

@@ -8,6 +8,7 @@
#include <memory>
#include <vector>
#include <cstdio>
#include <tuple>
#include <aedis/aedis.hpp>
#include <aedis/src.hpp>
@@ -19,33 +20,22 @@ using aedis::redis::command;
using aedis::generic::request;
using connection = aedis::generic::connection<command>;
auto run_handler =[](auto ec)
{
std::printf("Run: %s\n", ec.message().data());
};
auto exec_handler = [](auto ec, std::size_t read_size)
{
std::printf("Exec: %s %lu\n", ec.message().data(), read_size);
};
auto handler =[](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
std::vector<node<std::string>> resp;
request<command> req;
req.push(command::ping, "Ping example");
req.push(command::quit);
std::string resp;
net::io_context ioc;
connection db{ioc.get_executor(), adapt(resp)};
request<command> req;
req.push(command::set, "intro-key", "message1");
req.push(command::get, "intro-key");
req.push(command::quit);
db.async_exec(req, exec_handler);
db.async_exec(req, exec_handler);
db.async_run(run_handler);
connection db{ioc};
db.async_exec(req, adapt(resp), handler);
db.async_run(handler);
ioc.run();
for (auto const& e: resp)
std::cout << e.value << std::endl;
std::cout << resp << std::endl;
}

View File

@@ -29,6 +29,9 @@ using T0 = std::vector<aedis::resp3::node<std::string>>;
using T1 = std::set<std::string>;
using T2 = std::map<std::string, std::string>;
auto handler =[](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
T0 resp0;
@@ -50,7 +53,7 @@ int main()
};
net::io_context ioc;
connection db{ioc.get_executor(), adapter};
connection db{ioc};
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
@@ -73,13 +76,8 @@ int main()
req.push(command::hgetall, "hset-key");
req.push(command::quit);
db.async_exec(req, [](auto ec, std::size_t read_size) {
std::printf("Exec: %s %lu\n", ec.message().data(), read_size);
});
db.async_run([](auto ec) {
std::printf("Run: %s\n", ec.message().data());
});
db.async_exec(req, adapter, handler);
db.async_run(handler);
ioc.run();

View File

@@ -33,11 +33,11 @@ using response_type = std::vector<aedis::resp3::node<std::string>>;
* example.
*/
net::awaitable<void>
push_reader(std::shared_ptr<connection> db, response_type& resp)
net::awaitable<void> reader(std::shared_ptr<connection> db)
{
response_type resp;
for (;;) {
auto n = co_await db->async_read_push(net::use_awaitable);
auto n = co_await db->async_read_push(adapt(resp), net::use_awaitable);
std::cout
<< "Size: " << n << "\n"
<< "Event: " << resp.at(1).value << "\n"
@@ -49,25 +49,22 @@ push_reader(std::shared_ptr<connection> db, response_type& resp)
}
}
auto run_handler =[](auto ec)
net::awaitable<void> subscriber(std::shared_ptr<connection> db)
{
std::printf("Run: %s\n", ec.message().data());
};
request<command> req;
req.push(command::subscribe, "channel1", "channel2");
co_await db->async_exec(req, adapt(), net::use_awaitable);
}
auto handler = [](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
int main()
{
response_type resp;
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
db->set_adapter(adapt(resp));
net::co_spawn(ioc.get_executor(), push_reader(db, resp), net::detached);
request<command> req;
req.push(command::subscribe, "channel1", "channel2");
db->async_exec(req, [&](auto, auto){req.clear();});
db->async_run(run_handler);
auto db = std::make_shared<connection>(ioc);
net::co_spawn(ioc, reader(db), net::detached);
net::co_spawn(ioc, subscriber(db), net::detached);
db->async_run(handler);
ioc.run();
}

View File

@@ -50,7 +50,7 @@ void test_resolve_error()
net::io_context ioc;
connection::config cfg;
cfg.host = "Atibaia";
connection db(ioc.get_executor(), adapt(), cfg);
connection db(ioc, cfg);
db.async_run(f);
ioc.run();
}
@@ -67,7 +67,7 @@ void test_connect_error()
net::io_context ioc;
connection::config cfg;
cfg.port = "1";
connection db(ioc.get_executor(), adapt(), cfg);
connection db(ioc, cfg);
db.async_run(f);
ioc.run();
}
@@ -78,11 +78,11 @@ void test_connect_error()
void test_quit()
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
auto db = std::make_shared<connection>(ioc);
request<command> req;
req.push(command::quit);
db->async_exec(req, [](auto ec, auto r){
db->async_exec(req, adapt(), [](auto ec, auto r){
expect_no_error(ec);
//expect_eq(w, 36UL);
//expect_eq(r, 152UL);
@@ -101,12 +101,12 @@ net::awaitable<void>
push_consumer3(std::shared_ptr<connection> db)
{
{
auto [ec, n] = co_await db->async_read_push(as_tuple(net::use_awaitable));
auto [ec, n] = co_await db->async_read_push(adapt(), as_tuple(net::use_awaitable));
expect_no_error(ec);
}
{
auto [ec, n] = co_await db->async_read_push(as_tuple(net::use_awaitable));
auto [ec, n] = co_await db->async_read_push(adapt(), as_tuple(net::use_awaitable));
expect_error(ec, boost::asio::error::operation_aborted);
}
}
@@ -114,13 +114,13 @@ push_consumer3(std::shared_ptr<connection> db)
void test_push()
{
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
auto db = std::make_shared<connection>(ioc);
request<command> req;
req.push(command::subscribe, "channel");
req.push(command::quit);
db->async_exec(req, [](auto ec, auto r){
db->async_exec(req, adapt(), [](auto ec, auto r){
expect_no_error(ec);
//expect_eq(w, 68UL);
//expect_eq(r, 151UL);
@@ -142,7 +142,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
{
request<command> req;
req.push(command::quit);
db->async_exec(req, [](auto ec, auto){
db->async_exec(req, adapt(), [](auto ec, auto){
expect_no_error(ec);
});
@@ -153,7 +153,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
{
request<command> req;
req.push(command::quit);
db->async_exec(req, [](auto ec, auto){
db->async_exec(req, adapt(), [](auto ec, auto){
expect_no_error(ec);
});
@@ -168,7 +168,7 @@ void test_reconnect()
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor());
net::co_spawn(ioc.get_executor(), run5(db), net::detached);
net::co_spawn(ioc, run5(db), net::detached);
ioc.run();
}
@@ -179,15 +179,15 @@ void test_idle()
cfg.connect_timeout = std::chrono::seconds{1};
cfg.read_timeout = std::chrono::seconds{1};
cfg.write_timeout = std::chrono::seconds{1};
cfg.ping_delay_timeout = std::chrono::seconds{1};
cfg.ping_interval = std::chrono::seconds{1};
net::io_context ioc;
auto db = std::make_shared<connection>(ioc.get_executor(), adapt(), cfg);
auto db = std::make_shared<connection>(ioc, cfg);
request<command> req;
req.push(command::client, "PAUSE", 5000);
db->async_exec(req, [](auto ec, auto r){
db->async_exec(req, adapt(), [](auto ec, auto r){
expect_no_error(ec);
});

View File

@@ -46,8 +46,8 @@ int main()
auto const res = resv.resolve("127.0.0.1", "55555");
auto ep = *std::begin(res);
int n = 100;
for (int i = 0; i < 100; ++i)
int n = 10000;
for (int i = 0; i < 500; ++i)
net::co_spawn(ioc, example(ep, "Some message\n", n), net::detached);
ioc.run();