2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Refactoring.

This commit is contained in:
Marcelo Zimbres
2022-06-12 08:44:37 +02:00
parent 77fe3a0f5f
commit ce9cb04168
5 changed files with 290 additions and 214 deletions

View File

@@ -16,6 +16,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <aedis/adapt.hpp>
#include <aedis/resp3/request.hpp>
@@ -72,17 +73,14 @@ public:
*/
connection(boost::asio::any_io_executor ex, config cfg = config{})
: resv_{ex}
, wait_write_timer_{ex}
, ping_timer_{ex}
, write_timer_{ex}
, wait_read_timer_{ex}
, wait_push_timer_{ex}
, read_channel_{ex}
, push_channel_{ex}
, check_idle_timer_{ex}
, cfg_{cfg}
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
{
wait_push_timer_.expires_at(std::chrono::steady_clock::time_point::max());
wait_write_timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
connection(boost::asio::io_context& ioc, config cfg = config{})
@@ -228,9 +226,8 @@ public:
void close()
{
socket_->close();
wait_read_timer_.expires_at(std::chrono::steady_clock::now());
wait_push_timer_.expires_at(std::chrono::steady_clock::now());
wait_write_timer_.expires_at(std::chrono::steady_clock::now());
read_channel_.cancel();
push_channel_.cancel();
check_idle_timer_.expires_at(std::chrono::steady_clock::now());
ping_timer_.cancel();
for (auto& e: reqs_) {
@@ -251,7 +248,6 @@ private:
template <class T, class U> friend struct detail::exec_op;
template <class T, class U> friend struct detail::runexec_op;
template <class T> friend struct detail::exec_internal_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::connect_with_timeout_op;
template <class T> friend struct detail::resolve_with_timeout_op;
template <class T> friend struct detail::check_idle_op;
@@ -263,19 +259,18 @@ private:
bool stop = false;
};
void add_request(resp3::request const& req)
bool add_request(resp3::request const& req)
{
BOOST_ASSERT(!req.payload().empty());
auto const can_write = reqs_.empty();
reqs_.push_back(make_req_info(req.commands().size()));
reqs_.back()->timer.expires_at(std::chrono::steady_clock::time_point::max());
n_cmds_next_ += req.commands().size();
payload_next_ += req.payload();
for (auto cmd : req.commands())
cmds_.push(cmd.first);
if (can_write) {
BOOST_ASSERT(n_cmds_ == 0);
wait_write_timer_.cancel_one();
}
return can_write;
}
auto make_dynamic_buffer()
@@ -318,16 +313,6 @@ private:
>(detail::reader_op<connection>{this}, token, resv_.get_executor());
}
template <class CompletionToken = default_completion_token_type>
auto
writer(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::writer_op<connection>{this}, token, resv_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_read_write_check_ping(CompletionToken&& token = default_completion_token_type{})
@@ -389,14 +374,15 @@ private:
pool_.push_back(info);
}
using channel_type = boost::asio::experimental::channel<void(boost::system::error_code, std::size_t)>;
// IO objects
boost::asio::ip::tcp::resolver resv_;
std::shared_ptr<AsyncReadWriteStream> socket_;
boost::asio::steady_timer wait_write_timer_;
boost::asio::steady_timer ping_timer_;
boost::asio::steady_timer write_timer_;
boost::asio::steady_timer wait_read_timer_;
boost::asio::steady_timer wait_push_timer_;
channel_type read_channel_;
channel_type push_channel_;
boost::asio::steady_timer check_idle_timer_;
// Configuration parameters.
@@ -405,7 +391,6 @@ private:
// Buffer used by the read operations.
std::string read_buffer_;
std::size_t waiting_pushes_ = 0;
std::size_t n_cmds_ = 0;
std::size_t n_cmds_next_ = 0;
std::string payload_;

View File

@@ -93,6 +93,7 @@ template <class Conn, class Adapter>
struct read_push_op {
Conn* cli;
Adapter adapter;
std::size_t read_size;
boost::asio::coroutine coro;
template <class Self>
@@ -103,14 +104,10 @@ struct read_push_op {
{
reenter (coro)
{
if (cli->waiting_pushes_ == 0) {
yield cli->wait_push_timer_.async_wait(std::move(self));
if (!cli->socket_->is_open()) {
self.complete(ec, 0);
return;
}
BOOST_ASSERT(cli->waiting_pushes_ == 1);
yield cli->push_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
yield
@@ -120,9 +117,21 @@ struct read_push_op {
[adpt = adapter](resp3::node<boost::string_view> const& n, boost::system::error_code& ec) mutable { adpt(std::size_t(-1), command::invalid, n, ec);},
std::move(self));
cli->wait_read_timer_.cancel_one();
cli->waiting_pushes_ = 0;
self.complete(ec, n);
if (ec) {
self.complete(ec, 0);
return;
}
read_size = 0;
yield
cli->push_channel_.async_send(boost::system::error_code{}, 0, std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
self.complete(ec, read_size);
return;
}
}
@@ -146,41 +155,142 @@ struct exec_op {
{
reenter (coro)
{
cli->add_request(*req);
// Notice we use the back of the queue.
info = cli->reqs_.back();
info->timer.expires_at(std::chrono::steady_clock::time_point::max());
yield info->timer.async_wait(std::move(self));
if (info->stop) {
cli->release_req_info(info);
self.complete(ec, 0);
return;
// add_request will add the request payload to the buffer and
// return true if it can be written to the socket i.e. There
// is no ongoing request.
if (cli->add_request(*req) && cli->socket_) {
// We can proceeed and write the request to the socket.
BOOST_ASSERT(cli->n_cmds_ == 0);
info = cli->reqs_.back();
} else {
// There is an ongoing request being processed, when the
// response to this specific request arrives, the timer
// below will be canceled, either in the end of this
// operation (if it is in the middle of a pipeline) or on
// the reader_op (if it is the first in the pipeline).
// Notice we use the back of the queue.
info = cli->reqs_.back();
yield info->timer.async_wait(std::move(self));
if (info->stop) {
cli->release_req_info(info);
self.complete(boost::asio::error::basic_errors::operation_aborted, 0);
return;
}
}
//----------------------------------------------------------------
// Write operation.
BOOST_ASSERT(!cli->reqs_.empty());
if (cli->reqs_.front()->n_cmds == 0) {
// Some requests don't have response, so we have to exit
// the operation earlier.
cli->release_req_info(info);
cli->reqs_.pop_front();
// If there is no ongoing push-read operation we can
// request the reader to proceed, otherwise we can just
// exit.
if (cli->waiting_pushes_ == 0)
cli->wait_read_timer_.cancel_one();
// If n_cmds_ is zero there no ongoing request being
// processed so we can write. Otherwise, the payload
// corresponding to this request has already been sent in
// previous pipelines so that there is nothing to send.
if (cli->n_cmds_ == 0) {
BOOST_ASSERT(!cli->payload_next_.empty());
self.complete({}, 0);
return;
// Copies the request to variable that won't be touched
// while async_write is suspended.
std::swap(cli->n_cmds_next_, cli->n_cmds_);
std::swap(cli->payload_next_, cli->payload_);
cli->n_cmds_next_ = 0;
cli->payload_next_.clear();
cli->write_timer_.expires_after(cli->cfg_.write_timeout);
yield aedis::detail::async_write(
*cli->socket_, cli->write_timer_, boost::asio::buffer(cli->payload_),
std::move(self));
if (ec) {
cli->close();
self.complete(ec, 0);
return;
}
// A stop may have been requested while the write
// operation was suspended.
if (info->stop) {
self.complete(boost::asio::error::basic_errors::operation_aborted, 0);
return;
}
// If we add support for reconnect in the future, we may
// want to keep the payload around until all responses
// have been written.
cli->payload_.clear();
BOOST_ASSERT(!cli->reqs_.empty());
// If the connection that we have just written has no
// expected response e.g. subscribe, we can complete the
// operation here as there is nothing to read.
if (cli->reqs_.front()->n_cmds == 0) {
cli->release_req_info(info);
cli->reqs_.pop_front();
if (!cli->reqs_.empty()) {
cli->reqs_.front()->timer.cancel_one();
}
self.complete({}, 0);
return;
}
// Waits for the response to arrive. Notice cannot skip
// this as between and async_write and async_read we
// may receive a server push.
yield cli->read_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
if (info->stop) {
cli->release_req_info(info);
self.complete(ec, 0);
return;
}
}
//----------------------------------------------------------------
// Notice we use the front of the queue.
BOOST_ASSERT(!cli->read_buffer_.empty());
BOOST_ASSERT(resp3::to_type(cli->read_buffer_.front()) != resp3::type::push);
BOOST_ASSERT(resp3::to_type(cli->read_buffer_.front()) != resp3::type::invalid);
BOOST_ASSERT(!cli->reqs_.empty());
// Loop reading the responses to this request.
while (cli->reqs_.front()->n_cmds != 0) {
BOOST_ASSERT(!cli->cmds_.empty());
//-----------------------------------
// Section to handle pushes in the middle of a request.
if (cli->read_buffer_.empty()) {
// Read in some data.
yield boost::asio::async_read_until(*cli->socket_, cli->make_dynamic_buffer(), "\r\n", std::move(self));
if (ec) {
cli->close();
self.complete(ec, 0);
return;
}
}
// If the next request is a push we have to handle it to
// the read_push_op wait for it to be done and continue.
if (resp3::to_type(cli->read_buffer_.front()) == resp3::type::push) {
yield
cli->push_channel_.async_send(boost::system::error_code{}, 0, std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
// Waits the read push op to complete.
yield cli->push_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
}
//-----------------------------------
yield
resp3::async_read(
*cli->socket_,
@@ -213,17 +323,16 @@ struct exec_op {
cli->reqs_.pop_front();
if (cli->n_cmds_ == 0) {
// We are done with the pipeline and can resumes listening
// on the socket and send pending requests if there is
// any.
cli->wait_read_timer_.cancel_one();
if (!cli->reqs_.empty()) {
cli->wait_write_timer_.cancel_one();
// Done with the pipeline.
yield
cli->read_channel_.async_send(boost::system::error_code{}, 0, std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
} else {
// We are not done with the pipeline and can continue
// reading.
BOOST_ASSERT(!cli->reqs_.empty());
}
if (!cli->reqs_.empty()) {
cli->reqs_.front()->timer.cancel_one();
}
@@ -300,19 +409,15 @@ struct read_write_check_ping_op {
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 4> order = {}
, std::array<std::size_t, 3> order = {}
, boost::system::error_code ec0 = {}
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {}
, boost::system::error_code ec3 = {}
, boost::system::error_code ec4 = {})
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
// Starts the reader and writer ops.
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->writer(token);},
[this](auto token) { return cli->reader(token);},
[this](auto token) { return cli->async_idle_check(token);},
[this](auto token) { return cli->async_ping(token);}
@@ -323,23 +428,18 @@ struct read_write_check_ping_op {
switch (order[0]) {
case 0:
{
BOOST_ASSERT(ec1);
self.complete(ec1);
BOOST_ASSERT(ec0);
self.complete(ec0);
} break;
case 1:
{
BOOST_ASSERT(ec2);
self.complete(ec2);
BOOST_ASSERT(ec1);
self.complete(ec1);
} break;
case 2:
{
BOOST_ASSERT(ec3);
self.complete(ec3);
} break;
case 3:
{
BOOST_ASSERT(ec4);
self.complete(ec4);
BOOST_ASSERT(ec2);
self.complete(ec2);
} break;
default: BOOST_ASSERT(false);
}
@@ -388,6 +488,10 @@ struct run_op {
return;
}
if (!cli->reqs_.empty()) {
cli->reqs_.front()->timer.cancel_one();
}
yield cli->async_read_write_check_ping(std::move(self));
if (ec) {
self.complete(ec);
@@ -399,66 +503,6 @@ struct run_op {
}
};
template <class Conn>
struct writer_op {
Conn* cli;
boost::asio::coroutine coro;
template <class Self>
void
operator()(Self& self,
boost::system::error_code ec = {},
std::size_t n = 0)
{
reenter (coro) for (;;)
{
if (!cli->reqs_.empty()) {
BOOST_ASSERT(!cli->reqs_.empty());
BOOST_ASSERT(!cli->payload_next_.empty());
// Prepare for the next write.
cli->n_cmds_ = cli->n_cmds_next_;
cli->n_cmds_next_ = 0;
cli->payload_ = cli->payload_next_;
cli->payload_next_.clear();
cli->write_timer_.expires_after(cli->cfg_.write_timeout);
yield aedis::detail::async_write(
*cli->socket_, cli->write_timer_, boost::asio::buffer(cli->payload_),
std::move(self));
if (ec) {
cli->close();
self.complete(ec);
return;
}
if (!cli->socket_->is_open()) {
self.complete({});
return;
}
cli->payload_.clear();
BOOST_ASSERT(!cli->reqs_.empty());
if (cli->reqs_.front()->n_cmds == 0) {
// Some requests don't have response, so their timers
// won't be canceled on read op, we have to do it here.
cli->reqs_.front()->timer.cancel_one();
// Notice we don't have to call
// cli->wait_read_timer_.cancel_one(); as that
// operation is ongoing.
}
}
yield cli->wait_write_timer_.async_wait(std::move(self));
if (!cli->socket_->is_open()) {
// The completion has been explicited requested.
self.complete({});
return;
}
}
}
};
template <class Conn>
struct reader_op {
Conn* cli;
@@ -473,11 +517,7 @@ struct reader_op {
reenter (coro) for (;;)
{
yield boost::asio::async_read_until(
*cli->socket_,
cli->make_dynamic_buffer(),
"\r\n",
std::move(self));
yield boost::asio::async_read_until(*cli->socket_, cli->make_dynamic_buffer(), "\r\n", std::move(self));
if (ec) {
cli->close();
self.complete(ec);
@@ -486,38 +526,61 @@ struct reader_op {
cli->last_data_ = std::chrono::steady_clock::now();
if (resp3::to_type(cli->read_buffer_.front()) == resp3::type::push) {
cli->waiting_pushes_ = 1;
cli->wait_push_timer_.cancel_one();
} else if (cli->reqs_.empty()) {
// This situation is odd. I have noticed that unsolicited
// simple-error events are sent by the server (-MISCONF)
// under certain conditions. I expect them to have type
// push so we can distinguish them from responses to
// commands, but it is a simple-error. If we are lucky
// enough to receive them when the command queue is empty
// we can treat them as server pushes, otherwise it is
// impossible to handle them properly.
cli->waiting_pushes_ = 1;
cli->wait_push_timer_.cancel_one();
// Handles unsolicited events.
if (resp3::to_type(cli->read_buffer_.front()) == resp3::type::push || cli->reqs_.empty()) {
// TODO: Pack this in an operation.
// Regarding cli->reqs_.empty() above: This situation is
// odd. I have noticed that unsolicited simple-error
// events are sent by the server (-MISCONF) under certain
// conditions. I expect them to have type push so we can
// distinguish them from responses to commands, but it is
// a simple-error. If we are lucky enough to receive them
// when the command queue is empty we can treat them as
// server pushes, otherwise it is impossible to handle
// them properly.
// Handles control to the read push op.
yield
cli->push_channel_.async_send(boost::system::error_code{}, 0, std::move(self));
if (ec) {
self.complete(ec);
return;
}
// Waits the read push op to complete.
yield cli->push_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec);
return;
}
} else {
BOOST_ASSERT(!cli->cmds_.empty());
BOOST_ASSERT(cli->reqs_.front()->n_cmds != 0);
cli->reqs_.front()->timer.cancel_one();
}
cli->wait_read_timer_.expires_after(cli->cfg_.read_timeout);
yield cli->wait_read_timer_.async_wait(std::move(self));
if (!ec) {
cli->close();
self.complete(error::read_timeout);
return;
}
yield
cli->read_channel_.async_send(boost::system::error_code{}, 0, std::move(self));
if (ec) {
self.complete(ec);
return;
}
if (!cli->socket_->is_open()) {
cli->close();
self.complete(ec);
return;
if (!cli->socket_->is_open()) {
cli->close();
self.complete(ec);
return;
}
yield cli->read_channel_.async_receive(std::move(self));
if (ec) {
self.complete(ec);
return;
}
if (!cli->socket_->is_open()) {
cli->close();
self.complete(ec);
return;
}
}
}
}

View File

@@ -32,13 +32,13 @@ void expect_neq(T const& a, T const& b, std::string const& msg = "")
}
template <class T>
void expect_error(boost::system::error_code a, T expected = {})
void expect_error(boost::system::error_code a, T expected = {}, std::string const& msg = "")
{
if (a == expected) {
if (a)
std::cout << "Success: " << a.message() << " (" << a.category().name() << ")" << std::endl;
std::cout << "Success: " << a.message() << " (" << a.category().name() << ") " << msg << std::endl;
} else {
std::cout << "Error: " << a.message() << " (" << a.category().name() << ")" << std::endl;
std::cout << "Error: " << a.message() << " (" << a.category().name() << ") " << msg << std::endl;
exit(EXIT_FAILURE);
}
}

View File

@@ -17,6 +17,8 @@
#include "check.hpp"
// TODO: Test with subscribe that has wrong number of arguments.
// TODO: I observed that running the echo_server_client with session
// and 10000 messages will result in a timeout, which is wrong.
//std::cout << "aaaa " << ec.message() << " " << cmd << " " << n << std::endl;
@@ -40,14 +42,11 @@ auto print_read = [](auto cmd, auto n)
void test_resolve_error()
{
auto f = [](auto ec)
{
expect_error(ec, net::error::netdb_errors::host_not_found);
};
net::io_context ioc;
connection db(ioc);
db.async_run("Atibaia", "6379", f);
db.async_run("Atibaia", "6379", [](auto ec) {
expect_error(ec, net::error::netdb_errors::host_not_found, "test_resolve_error");
});
ioc.run();
}
@@ -55,14 +54,11 @@ void test_resolve_error()
void test_connect_error()
{
auto f = [](auto ec)
{
expect_error(ec, net::error::basic_errors::connection_refused);
};
net::io_context ioc;
connection db(ioc);
db.async_run("127.0.0.1", "1", f);
db.async_run("127.0.0.1", "1", [](auto ec) {
expect_error(ec, net::error::basic_errors::connection_refused, "test_connect_error");
});
ioc.run();
}
@@ -78,12 +74,10 @@ void test_quit()
req.push(command::quit);
db->async_exec(req, aedis::adapt(), [](auto ec, auto r){
expect_no_error(ec);
//expect_eq(w, 36UL);
//expect_eq(r, 152UL);
});
db->async_run("127.0.0.1", "6379", [](auto ec){
expect_error(ec, net::error::misc_errors::eof);
expect_error(ec, net::error::misc_errors::eof, "test_quit");
});
ioc.run();
@@ -97,7 +91,7 @@ void test_quit2()
net::io_context ioc;
auto db = std::make_shared<connection>(ioc);
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto n){
expect_error(ec, net::error::misc_errors::eof);
expect_error(ec, net::error::misc_errors::eof, "test_quit2");
});
ioc.run();
@@ -115,7 +109,7 @@ push_consumer3(std::shared_ptr<connection> db)
{
auto [ec, n] = co_await db->async_read_push(aedis::adapt(), as_tuple(net::use_awaitable));
expect_error(ec, boost::asio::error::operation_aborted);
expect_error(ec, boost::asio::experimental::channel_errc::channel_cancelled, "push_consumer3");
}
}
@@ -128,7 +122,7 @@ void test_push()
req.push(command::subscribe, "channel");
req.push(command::quit);
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, net::error::misc_errors::eof);
expect_error(ec, net::error::misc_errors::eof, "test_push");
});
net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached);
ioc.run();
@@ -146,7 +140,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
});
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof);
expect_error(ec, net::error::misc_errors::eof, "run5");
}
{
@@ -157,7 +151,7 @@ net::awaitable<void> run5(std::shared_ptr<connection> db)
});
auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable));
expect_error(ec, net::error::misc_errors::eof);
expect_error(ec, net::error::misc_errors::eof, "run5");
}
}
@@ -182,7 +176,7 @@ void test_no_push_reader1()
req.push(command::subscribe, "channel");
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, aedis::error::read_timeout);
expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader1");
});
ioc.run();
@@ -200,7 +194,7 @@ void test_no_push_reader2()
req.push(command::subscribe);
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, aedis::error::read_timeout);
expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader2");
});
ioc.run();
@@ -219,7 +213,7 @@ void test_no_push_reader3()
req.push(command::subscribe);
db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){
expect_error(ec, aedis::error::read_timeout);
expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader3");
});
ioc.run();
@@ -245,12 +239,38 @@ void test_idle()
});
db->async_run("127.0.0.1", "6379", [](auto ec){
expect_error(ec, aedis::error::idle_timeout);
expect_error(ec, aedis::error::idle_timeout, "test_idle");
});
ioc.run();
}
auto handler =[](auto ec, auto...)
{ std::cout << ec.message() << std::endl; };
void test_push3()
{
request req1;
req1.push(command::ping, "Message1");
request req2;
req2.push(command::subscribe, "channel");
request req3;
req3.push(command::ping, "Message2");
req3.push(command::quit);
std::tuple<std::string, std::string> resp;
net::io_context ioc;
connection db{ioc};
db.async_exec(req1, aedis::adapt(resp), handler);
db.async_exec(req2, aedis::adapt(resp), handler);
db.async_exec(req3, aedis::adapt(resp), handler);
db.async_run("127.0.0.1", "6379", handler);
ioc.run();
}
int main()
{
test_resolve_error();
@@ -258,6 +278,7 @@ int main()
test_quit();
test_quit2();
test_push();
test_push3();
test_no_push_reader1();
test_no_push_reader2();
test_no_push_reader3();

View File

@@ -37,18 +37,25 @@ example(boost::asio::ip::tcp::endpoint ep, std::string msg, int n)
}
}
int main()
int main(int argc, char* argv[])
{
try {
int sessions = 1;
int msgs = 1;
if (argc == 3) {
sessions = std::stoi(argv[1]);
msgs = std::stoi(argv[2]);
}
net::io_context ioc;
tcp::resolver resv{ioc};
auto const res = resv.resolve("127.0.0.1", "55555");
auto ep = *std::begin(res);
int n = 10000;
for (int i = 0; i < 500; ++i)
net::co_spawn(ioc, example(ep, "Some message\n", n), net::detached);
for (int i = 0; i < sessions; ++i)
net::co_spawn(ioc, example(ep, "Some message\n", msgs), net::detached);
ioc.run();
} catch (std::exception const& e) {