diff --git a/aedis/connection.hpp b/aedis/connection.hpp index 4630da6a..249fc42c 100644 --- a/aedis/connection.hpp +++ b/aedis/connection.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -72,17 +73,14 @@ public: */ connection(boost::asio::any_io_executor ex, config cfg = config{}) : resv_{ex} - , wait_write_timer_{ex} , ping_timer_{ex} , write_timer_{ex} - , wait_read_timer_{ex} - , wait_push_timer_{ex} + , read_channel_{ex} + , push_channel_{ex} , check_idle_timer_{ex} , cfg_{cfg} , last_data_{std::chrono::time_point::min()} { - wait_push_timer_.expires_at(std::chrono::steady_clock::time_point::max()); - wait_write_timer_.expires_at(std::chrono::steady_clock::time_point::max()); } connection(boost::asio::io_context& ioc, config cfg = config{}) @@ -228,9 +226,8 @@ public: void close() { socket_->close(); - wait_read_timer_.expires_at(std::chrono::steady_clock::now()); - wait_push_timer_.expires_at(std::chrono::steady_clock::now()); - wait_write_timer_.expires_at(std::chrono::steady_clock::now()); + read_channel_.cancel(); + push_channel_.cancel(); check_idle_timer_.expires_at(std::chrono::steady_clock::now()); ping_timer_.cancel(); for (auto& e: reqs_) { @@ -251,7 +248,6 @@ private: template friend struct detail::exec_op; template friend struct detail::runexec_op; template friend struct detail::exec_internal_op; - template friend struct detail::writer_op; template friend struct detail::connect_with_timeout_op; template friend struct detail::resolve_with_timeout_op; template friend struct detail::check_idle_op; @@ -263,19 +259,18 @@ private: bool stop = false; }; - void add_request(resp3::request const& req) + bool add_request(resp3::request const& req) { BOOST_ASSERT(!req.payload().empty()); auto const can_write = reqs_.empty(); reqs_.push_back(make_req_info(req.commands().size())); + reqs_.back()->timer.expires_at(std::chrono::steady_clock::time_point::max()); n_cmds_next_ += req.commands().size(); payload_next_ += req.payload(); for (auto cmd : req.commands()) cmds_.push(cmd.first); - if (can_write) { - BOOST_ASSERT(n_cmds_ == 0); - wait_write_timer_.cancel_one(); - } + + return can_write; } auto make_dynamic_buffer() @@ -318,16 +313,6 @@ private: >(detail::reader_op{this}, token, resv_.get_executor()); } - template - auto - writer(CompletionToken&& token = default_completion_token_type{}) - { - return boost::asio::async_compose - < CompletionToken - , void(boost::system::error_code) - >(detail::writer_op{this}, token, resv_); - } - template auto async_read_write_check_ping(CompletionToken&& token = default_completion_token_type{}) @@ -389,14 +374,15 @@ private: pool_.push_back(info); } + using channel_type = boost::asio::experimental::channel; + // IO objects boost::asio::ip::tcp::resolver resv_; std::shared_ptr socket_; - boost::asio::steady_timer wait_write_timer_; boost::asio::steady_timer ping_timer_; boost::asio::steady_timer write_timer_; - boost::asio::steady_timer wait_read_timer_; - boost::asio::steady_timer wait_push_timer_; + channel_type read_channel_; + channel_type push_channel_; boost::asio::steady_timer check_idle_timer_; // Configuration parameters. @@ -405,7 +391,6 @@ private: // Buffer used by the read operations. std::string read_buffer_; - std::size_t waiting_pushes_ = 0; std::size_t n_cmds_ = 0; std::size_t n_cmds_next_ = 0; std::string payload_; diff --git a/aedis/detail/connection_ops.hpp b/aedis/detail/connection_ops.hpp index cdc83330..64e7abf5 100644 --- a/aedis/detail/connection_ops.hpp +++ b/aedis/detail/connection_ops.hpp @@ -93,6 +93,7 @@ template struct read_push_op { Conn* cli; Adapter adapter; + std::size_t read_size; boost::asio::coroutine coro; template @@ -103,14 +104,10 @@ struct read_push_op { { reenter (coro) { - if (cli->waiting_pushes_ == 0) { - yield cli->wait_push_timer_.async_wait(std::move(self)); - if (!cli->socket_->is_open()) { - self.complete(ec, 0); - return; - } - - BOOST_ASSERT(cli->waiting_pushes_ == 1); + yield cli->push_channel_.async_receive(std::move(self)); + if (ec) { + self.complete(ec, 0); + return; } yield @@ -120,9 +117,21 @@ struct read_push_op { [adpt = adapter](resp3::node const& n, boost::system::error_code& ec) mutable { adpt(std::size_t(-1), command::invalid, n, ec);}, std::move(self)); - cli->wait_read_timer_.cancel_one(); - cli->waiting_pushes_ = 0; - self.complete(ec, n); + if (ec) { + self.complete(ec, 0); + return; + } + + read_size = 0; + + yield + cli->push_channel_.async_send(boost::system::error_code{}, 0, std::move(self)); + if (ec) { + self.complete(ec, 0); + return; + } + + self.complete(ec, read_size); return; } } @@ -146,41 +155,142 @@ struct exec_op { { reenter (coro) { - cli->add_request(*req); - // Notice we use the back of the queue. - info = cli->reqs_.back(); - info->timer.expires_at(std::chrono::steady_clock::time_point::max()); - yield info->timer.async_wait(std::move(self)); - if (info->stop) { - cli->release_req_info(info); - self.complete(ec, 0); - return; + // add_request will add the request payload to the buffer and + // return true if it can be written to the socket i.e. There + // is no ongoing request. + if (cli->add_request(*req) && cli->socket_) { + // We can proceeed and write the request to the socket. + BOOST_ASSERT(cli->n_cmds_ == 0); + info = cli->reqs_.back(); + } else { + // There is an ongoing request being processed, when the + // response to this specific request arrives, the timer + // below will be canceled, either in the end of this + // operation (if it is in the middle of a pipeline) or on + // the reader_op (if it is the first in the pipeline). + // Notice we use the back of the queue. + info = cli->reqs_.back(); + yield info->timer.async_wait(std::move(self)); + if (info->stop) { + cli->release_req_info(info); + self.complete(boost::asio::error::basic_errors::operation_aborted, 0); + return; + } } + //---------------------------------------------------------------- + // Write operation. + BOOST_ASSERT(!cli->reqs_.empty()); - if (cli->reqs_.front()->n_cmds == 0) { - // Some requests don't have response, so we have to exit - // the operation earlier. - cli->release_req_info(info); - cli->reqs_.pop_front(); - // If there is no ongoing push-read operation we can - // request the reader to proceed, otherwise we can just - // exit. - if (cli->waiting_pushes_ == 0) - cli->wait_read_timer_.cancel_one(); + // If n_cmds_ is zero there no ongoing request being + // processed so we can write. Otherwise, the payload + // corresponding to this request has already been sent in + // previous pipelines so that there is nothing to send. + if (cli->n_cmds_ == 0) { + BOOST_ASSERT(!cli->payload_next_.empty()); - self.complete({}, 0); - return; + // Copies the request to variable that won't be touched + // while async_write is suspended. + std::swap(cli->n_cmds_next_, cli->n_cmds_); + std::swap(cli->payload_next_, cli->payload_); + cli->n_cmds_next_ = 0; + cli->payload_next_.clear(); + + cli->write_timer_.expires_after(cli->cfg_.write_timeout); + yield aedis::detail::async_write( + *cli->socket_, cli->write_timer_, boost::asio::buffer(cli->payload_), + std::move(self)); + if (ec) { + cli->close(); + self.complete(ec, 0); + return; + } + + // A stop may have been requested while the write + // operation was suspended. + if (info->stop) { + self.complete(boost::asio::error::basic_errors::operation_aborted, 0); + return; + } + + // If we add support for reconnect in the future, we may + // want to keep the payload around until all responses + // have been written. + cli->payload_.clear(); + + BOOST_ASSERT(!cli->reqs_.empty()); + + // If the connection that we have just written has no + // expected response e.g. subscribe, we can complete the + // operation here as there is nothing to read. + if (cli->reqs_.front()->n_cmds == 0) { + cli->release_req_info(info); + cli->reqs_.pop_front(); + if (!cli->reqs_.empty()) { + cli->reqs_.front()->timer.cancel_one(); + } + self.complete({}, 0); + return; + } + + // Waits for the response to arrive. Notice cannot skip + // this as between and async_write and async_read we + // may receive a server push. + yield cli->read_channel_.async_receive(std::move(self)); + if (ec) { + self.complete(ec, 0); + return; + } + + if (info->stop) { + cli->release_req_info(info); + self.complete(ec, 0); + return; + } } + //---------------------------------------------------------------- + // Notice we use the front of the queue. - BOOST_ASSERT(!cli->read_buffer_.empty()); - BOOST_ASSERT(resp3::to_type(cli->read_buffer_.front()) != resp3::type::push); - BOOST_ASSERT(resp3::to_type(cli->read_buffer_.front()) != resp3::type::invalid); + BOOST_ASSERT(!cli->reqs_.empty()); + + // Loop reading the responses to this request. while (cli->reqs_.front()->n_cmds != 0) { BOOST_ASSERT(!cli->cmds_.empty()); + //----------------------------------- + // Section to handle pushes in the middle of a request. + if (cli->read_buffer_.empty()) { + // Read in some data. + yield boost::asio::async_read_until(*cli->socket_, cli->make_dynamic_buffer(), "\r\n", std::move(self)); + if (ec) { + cli->close(); + self.complete(ec, 0); + return; + } + } + + // If the next request is a push we have to handle it to + // the read_push_op wait for it to be done and continue. + + if (resp3::to_type(cli->read_buffer_.front()) == resp3::type::push) { + yield + cli->push_channel_.async_send(boost::system::error_code{}, 0, std::move(self)); + if (ec) { + self.complete(ec, 0); + return; + } + + // Waits the read push op to complete. + yield cli->push_channel_.async_receive(std::move(self)); + if (ec) { + self.complete(ec, 0); + return; + } + } + //----------------------------------- + yield resp3::async_read( *cli->socket_, @@ -213,17 +323,16 @@ struct exec_op { cli->reqs_.pop_front(); if (cli->n_cmds_ == 0) { - // We are done with the pipeline and can resumes listening - // on the socket and send pending requests if there is - // any. - cli->wait_read_timer_.cancel_one(); - if (!cli->reqs_.empty()) { - cli->wait_write_timer_.cancel_one(); + // Done with the pipeline. + yield + cli->read_channel_.async_send(boost::system::error_code{}, 0, std::move(self)); + if (ec) { + self.complete(ec, 0); + return; } - } else { - // We are not done with the pipeline and can continue - // reading. - BOOST_ASSERT(!cli->reqs_.empty()); + } + + if (!cli->reqs_.empty()) { cli->reqs_.front()->timer.cancel_one(); } @@ -300,19 +409,15 @@ struct read_write_check_ping_op { template void operator()( Self& self - , std::array order = {} + , std::array order = {} + , boost::system::error_code ec0 = {} , boost::system::error_code ec1 = {} - , boost::system::error_code ec2 = {} - , boost::system::error_code ec3 = {} - , boost::system::error_code ec4 = {}) + , boost::system::error_code ec2 = {}) { reenter (coro) { - // Starts the reader and writer ops. - yield boost::asio::experimental::make_parallel_group( - [this](auto token) { return cli->writer(token);}, [this](auto token) { return cli->reader(token);}, [this](auto token) { return cli->async_idle_check(token);}, [this](auto token) { return cli->async_ping(token);} @@ -323,23 +428,18 @@ struct read_write_check_ping_op { switch (order[0]) { case 0: { - BOOST_ASSERT(ec1); - self.complete(ec1); + BOOST_ASSERT(ec0); + self.complete(ec0); } break; case 1: { - BOOST_ASSERT(ec2); - self.complete(ec2); + BOOST_ASSERT(ec1); + self.complete(ec1); } break; case 2: { - BOOST_ASSERT(ec3); - self.complete(ec3); - } break; - case 3: - { - BOOST_ASSERT(ec4); - self.complete(ec4); + BOOST_ASSERT(ec2); + self.complete(ec2); } break; default: BOOST_ASSERT(false); } @@ -388,6 +488,10 @@ struct run_op { return; } + if (!cli->reqs_.empty()) { + cli->reqs_.front()->timer.cancel_one(); + } + yield cli->async_read_write_check_ping(std::move(self)); if (ec) { self.complete(ec); @@ -399,66 +503,6 @@ struct run_op { } }; -template -struct writer_op { - Conn* cli; - boost::asio::coroutine coro; - - template - void - operator()(Self& self, - boost::system::error_code ec = {}, - std::size_t n = 0) - { - reenter (coro) for (;;) - { - if (!cli->reqs_.empty()) { - BOOST_ASSERT(!cli->reqs_.empty()); - BOOST_ASSERT(!cli->payload_next_.empty()); - - // Prepare for the next write. - cli->n_cmds_ = cli->n_cmds_next_; - cli->n_cmds_next_ = 0; - cli->payload_ = cli->payload_next_; - cli->payload_next_.clear(); - - cli->write_timer_.expires_after(cli->cfg_.write_timeout); - yield aedis::detail::async_write( - *cli->socket_, cli->write_timer_, boost::asio::buffer(cli->payload_), - std::move(self)); - if (ec) { - cli->close(); - self.complete(ec); - return; - } - - if (!cli->socket_->is_open()) { - self.complete({}); - return; - } - - cli->payload_.clear(); - BOOST_ASSERT(!cli->reqs_.empty()); - if (cli->reqs_.front()->n_cmds == 0) { - // Some requests don't have response, so their timers - // won't be canceled on read op, we have to do it here. - cli->reqs_.front()->timer.cancel_one(); - // Notice we don't have to call - // cli->wait_read_timer_.cancel_one(); as that - // operation is ongoing. - } - } - - yield cli->wait_write_timer_.async_wait(std::move(self)); - if (!cli->socket_->is_open()) { - // The completion has been explicited requested. - self.complete({}); - return; - } - } - } -}; - template struct reader_op { Conn* cli; @@ -473,11 +517,7 @@ struct reader_op { reenter (coro) for (;;) { - yield boost::asio::async_read_until( - *cli->socket_, - cli->make_dynamic_buffer(), - "\r\n", - std::move(self)); + yield boost::asio::async_read_until(*cli->socket_, cli->make_dynamic_buffer(), "\r\n", std::move(self)); if (ec) { cli->close(); self.complete(ec); @@ -486,38 +526,61 @@ struct reader_op { cli->last_data_ = std::chrono::steady_clock::now(); - if (resp3::to_type(cli->read_buffer_.front()) == resp3::type::push) { - cli->waiting_pushes_ = 1; - cli->wait_push_timer_.cancel_one(); - } else if (cli->reqs_.empty()) { - // This situation is odd. I have noticed that unsolicited - // simple-error events are sent by the server (-MISCONF) - // under certain conditions. I expect them to have type - // push so we can distinguish them from responses to - // commands, but it is a simple-error. If we are lucky - // enough to receive them when the command queue is empty - // we can treat them as server pushes, otherwise it is - // impossible to handle them properly. - cli->waiting_pushes_ = 1; - cli->wait_push_timer_.cancel_one(); + // Handles unsolicited events. + if (resp3::to_type(cli->read_buffer_.front()) == resp3::type::push || cli->reqs_.empty()) { + // TODO: Pack this in an operation. + // Regarding cli->reqs_.empty() above: This situation is + // odd. I have noticed that unsolicited simple-error + // events are sent by the server (-MISCONF) under certain + // conditions. I expect them to have type push so we can + // distinguish them from responses to commands, but it is + // a simple-error. If we are lucky enough to receive them + // when the command queue is empty we can treat them as + // server pushes, otherwise it is impossible to handle + // them properly. + + // Handles control to the read push op. + yield + cli->push_channel_.async_send(boost::system::error_code{}, 0, std::move(self)); + if (ec) { + self.complete(ec); + return; + } + + // Waits the read push op to complete. + yield cli->push_channel_.async_receive(std::move(self)); + if (ec) { + self.complete(ec); + return; + } } else { BOOST_ASSERT(!cli->cmds_.empty()); BOOST_ASSERT(cli->reqs_.front()->n_cmds != 0); - cli->reqs_.front()->timer.cancel_one(); - } - cli->wait_read_timer_.expires_after(cli->cfg_.read_timeout); - yield cli->wait_read_timer_.async_wait(std::move(self)); - if (!ec) { - cli->close(); - self.complete(error::read_timeout); - return; - } + yield + cli->read_channel_.async_send(boost::system::error_code{}, 0, std::move(self)); + if (ec) { + self.complete(ec); + return; + } - if (!cli->socket_->is_open()) { - cli->close(); - self.complete(ec); - return; + if (!cli->socket_->is_open()) { + cli->close(); + self.complete(ec); + return; + } + + yield cli->read_channel_.async_receive(std::move(self)); + if (ec) { + self.complete(ec); + return; + } + + if (!cli->socket_->is_open()) { + cli->close(); + self.complete(ec); + return; + } } } } diff --git a/tests/check.hpp b/tests/check.hpp index 02b49dbc..22653983 100644 --- a/tests/check.hpp +++ b/tests/check.hpp @@ -32,13 +32,13 @@ void expect_neq(T const& a, T const& b, std::string const& msg = "") } template -void expect_error(boost::system::error_code a, T expected = {}) +void expect_error(boost::system::error_code a, T expected = {}, std::string const& msg = "") { if (a == expected) { if (a) - std::cout << "Success: " << a.message() << " (" << a.category().name() << ")" << std::endl; + std::cout << "Success: " << a.message() << " (" << a.category().name() << ") " << msg << std::endl; } else { - std::cout << "Error: " << a.message() << " (" << a.category().name() << ")" << std::endl; + std::cout << "Error: " << a.message() << " (" << a.category().name() << ") " << msg << std::endl; exit(EXIT_FAILURE); } } diff --git a/tests/high_level.cpp b/tests/high_level.cpp index ca6d0143..777edd14 100644 --- a/tests/high_level.cpp +++ b/tests/high_level.cpp @@ -17,6 +17,8 @@ #include "check.hpp" // TODO: Test with subscribe that has wrong number of arguments. +// TODO: I observed that running the echo_server_client with session +// and 10000 messages will result in a timeout, which is wrong. //std::cout << "aaaa " << ec.message() << " " << cmd << " " << n << std::endl; @@ -40,14 +42,11 @@ auto print_read = [](auto cmd, auto n) void test_resolve_error() { - auto f = [](auto ec) - { - expect_error(ec, net::error::netdb_errors::host_not_found); - }; - net::io_context ioc; connection db(ioc); - db.async_run("Atibaia", "6379", f); + db.async_run("Atibaia", "6379", [](auto ec) { + expect_error(ec, net::error::netdb_errors::host_not_found, "test_resolve_error"); + }); ioc.run(); } @@ -55,14 +54,11 @@ void test_resolve_error() void test_connect_error() { - auto f = [](auto ec) - { - expect_error(ec, net::error::basic_errors::connection_refused); - }; - net::io_context ioc; connection db(ioc); - db.async_run("127.0.0.1", "1", f); + db.async_run("127.0.0.1", "1", [](auto ec) { + expect_error(ec, net::error::basic_errors::connection_refused, "test_connect_error"); + }); ioc.run(); } @@ -78,12 +74,10 @@ void test_quit() req.push(command::quit); db->async_exec(req, aedis::adapt(), [](auto ec, auto r){ expect_no_error(ec); - //expect_eq(w, 36UL); - //expect_eq(r, 152UL); }); db->async_run("127.0.0.1", "6379", [](auto ec){ - expect_error(ec, net::error::misc_errors::eof); + expect_error(ec, net::error::misc_errors::eof, "test_quit"); }); ioc.run(); @@ -97,7 +91,7 @@ void test_quit2() net::io_context ioc; auto db = std::make_shared(ioc); db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto n){ - expect_error(ec, net::error::misc_errors::eof); + expect_error(ec, net::error::misc_errors::eof, "test_quit2"); }); ioc.run(); @@ -115,7 +109,7 @@ push_consumer3(std::shared_ptr db) { auto [ec, n] = co_await db->async_read_push(aedis::adapt(), as_tuple(net::use_awaitable)); - expect_error(ec, boost::asio::error::operation_aborted); + expect_error(ec, boost::asio::experimental::channel_errc::channel_cancelled, "push_consumer3"); } } @@ -128,7 +122,7 @@ void test_push() req.push(command::subscribe, "channel"); req.push(command::quit); db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){ - expect_error(ec, net::error::misc_errors::eof); + expect_error(ec, net::error::misc_errors::eof, "test_push"); }); net::co_spawn(ioc.get_executor(), push_consumer3(db), net::detached); ioc.run(); @@ -146,7 +140,7 @@ net::awaitable run5(std::shared_ptr db) }); auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable)); - expect_error(ec, net::error::misc_errors::eof); + expect_error(ec, net::error::misc_errors::eof, "run5"); } { @@ -157,7 +151,7 @@ net::awaitable run5(std::shared_ptr db) }); auto [ec] = co_await db->async_run("127.0.0.1", "6379", as_tuple(net::use_awaitable)); - expect_error(ec, net::error::misc_errors::eof); + expect_error(ec, net::error::misc_errors::eof, "run5"); } } @@ -182,7 +176,7 @@ void test_no_push_reader1() req.push(command::subscribe, "channel"); db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){ - expect_error(ec, aedis::error::read_timeout); + expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader1"); }); ioc.run(); @@ -200,7 +194,7 @@ void test_no_push_reader2() req.push(command::subscribe); db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){ - expect_error(ec, aedis::error::read_timeout); + expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader2"); }); ioc.run(); @@ -219,7 +213,7 @@ void test_no_push_reader3() req.push(command::subscribe); db->async_exec("127.0.0.1", "6379", req, aedis::adapt(), [](auto ec, auto r){ - expect_error(ec, aedis::error::read_timeout); + expect_error(ec, aedis::error::idle_timeout, "test_no_push_reader3"); }); ioc.run(); @@ -245,12 +239,38 @@ void test_idle() }); db->async_run("127.0.0.1", "6379", [](auto ec){ - expect_error(ec, aedis::error::idle_timeout); + expect_error(ec, aedis::error::idle_timeout, "test_idle"); }); ioc.run(); } +auto handler =[](auto ec, auto...) + { std::cout << ec.message() << std::endl; }; + +void test_push3() +{ + request req1; + req1.push(command::ping, "Message1"); + + request req2; + req2.push(command::subscribe, "channel"); + + request req3; + req3.push(command::ping, "Message2"); + req3.push(command::quit); + + std::tuple resp; + + net::io_context ioc; + connection db{ioc}; + db.async_exec(req1, aedis::adapt(resp), handler); + db.async_exec(req2, aedis::adapt(resp), handler); + db.async_exec(req3, aedis::adapt(resp), handler); + db.async_run("127.0.0.1", "6379", handler); + ioc.run(); +} + int main() { test_resolve_error(); @@ -258,6 +278,7 @@ int main() test_quit(); test_quit2(); test_push(); + test_push3(); test_no_push_reader1(); test_no_push_reader2(); test_no_push_reader3(); diff --git a/tools/echo_server_client.cpp b/tools/echo_server_client.cpp index 9102c119..133021ae 100644 --- a/tools/echo_server_client.cpp +++ b/tools/echo_server_client.cpp @@ -37,18 +37,25 @@ example(boost::asio::ip::tcp::endpoint ep, std::string msg, int n) } } -int main() +int main(int argc, char* argv[]) { try { + int sessions = 1; + int msgs = 1; + + if (argc == 3) { + sessions = std::stoi(argv[1]); + msgs = std::stoi(argv[2]); + } + net::io_context ioc; tcp::resolver resv{ioc}; auto const res = resv.resolve("127.0.0.1", "55555"); auto ep = *std::begin(res); - int n = 10000; - for (int i = 0; i < 500; ++i) - net::co_spawn(ioc, example(ep, "Some message\n", n), net::detached); + for (int i = 0; i < sessions; ++i) + net::co_spawn(ioc, example(ep, "Some message\n", msgs), net::detached); ioc.run(); } catch (std::exception const& e) {