diff --git a/README.md b/README.md index 5bc472be..b41e7a1b 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,8 @@ void send_ping() net::io_context ioc; session s {ioc}; - s.send(ping()); + s.send(ping() + quit()); + s.disable_reconnect(); s.run(); ioc.run(); @@ -71,7 +72,14 @@ void example1() + publish("g", "A message") + exec(); - send(std::move(s)); + net::io_context ioc; + session ss {ioc}; + + ss.send(std::move(s)); + ss.disable_reconnect(); + + ss.run(); + ioc.run(); } ``` @@ -92,11 +100,12 @@ void example2() , log::level::info }; - session s {ioc, cfg, "id"}; + session ss {ioc, cfg, "id"}; - s.send(role()); + ss.send(role() + quit()); + ss.disable_reconnect(); - s.run(); + ss.run(); ioc.run(); } ``` @@ -132,7 +141,8 @@ void example3() std::cout << std::endl; }); - s.send(ping()); + s.send(ping() + quit()); + s.disable_reconnect(); s.run(); ioc.run(); diff --git a/aedis.hpp b/aedis.hpp index 37173aa0..692da9cb 100644 --- a/aedis.hpp +++ b/aedis.hpp @@ -370,6 +370,12 @@ auto lpush(std::string const& key, Iter begin, Iter end) return resp::assemble("LPUSH", {key}, begin, end); } +inline +auto quit() +{ + return resp::assemble("QUIT"); +} + inline auto multi() { @@ -671,7 +677,7 @@ private: long long pipeline_id_ = 0; instance instance_; int sentinel_idx_ = 0; - bool close_requested_ = false; + bool disable_reconnect_ = false; msg_handler_type msg_handler_ = [this](auto ec, auto const& res) { @@ -719,19 +725,12 @@ private: return; } - // Closes the connection with the sentinel and connects with the master. - ec = {}; - socket_.close(ec); - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_instance: {1}." - , id_ - , ec.message()); - return; - } + // Closes the connection with the sentinel and connects with the + // master. + close("{0}/on_instance: {1}."); - // NOTE: Call sync resolve to prevent asio from starting a new thread. + // NOTE: Call sync resolve to prevent asio from starting a new + // thread. ec = {}; auto res = resolver_.resolve(host, port, ec); if (ec) { @@ -765,7 +764,7 @@ private: , ec.message() , endpoint); - if (!close_requested_) + if (!disable_reconnect_) run(); return; @@ -814,8 +813,8 @@ private: // net::error::connection_reset // net::error::operation_aborted - close_impl(); - if (!close_requested_) + close("{0}/on_resp: {1}."); + if (!disable_reconnect_) run(); return; @@ -823,28 +822,20 @@ private: msg_handler_(ec, std::move(buffer_.res)); - if (!close_requested_) - do_read_resp(); - - if (std::empty(msg_queue_)) { - if (close_requested_) - close_impl(); + do_read_resp(); + if (std::empty(msg_queue_)) return; - } // In practive, the if condition below will always hold as we pop the // last written message as soon as the first response from a pipeline - // is received. If think the code is clearer this way. + // is received and send the next. If think the code is clearer this + // way. if (msg_queue_.front().sent) { msg_queue_.pop(); - if (std::empty(msg_queue_)) { - if (close_requested_) - close_impl(); - + if (std::empty(msg_queue_)) return; - } do_write(); } @@ -872,32 +863,17 @@ private: net::async_write( socket_ , net::buffer(msg_queue_.front().payload) , f); - msg_queue_.front().sent = true; - - // If this was the last message and the user has requested a close we - // can shutdown the socket to disable sending and receiving. - if (std::empty(msg_queue_) && close_requested_) { - boost::system::error_code ec; - socket_.shutdown(net::ip::tcp::socket::shutdown_send, ec); - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_write/shutdown: {1}." - , id_ - , ec.message()); - } - } } - void close_impl() + void close(char const* msg) { boost::system::error_code ec; socket_.close(ec); if (ec) { log::write( cfg_.log_filter , log::level::warning - , "{0}/close_imp: {1}." + , msg , id_ , ec.message()); } @@ -956,7 +932,7 @@ private: public: session( net::io_context& ioc , config cfg - , std::string id = {}) + , std::string id = "aedis") : id_(id) , cfg_ {std::move(cfg)} , resolver_ {ioc} @@ -978,7 +954,7 @@ public: auto send(std::string msg) { assert(!std::empty(msg)); - assert(!close_requested_); + assert(!disable_reconnect_); auto const max_pp_size_reached = pipeline_size_ >= cfg_.max_pipeline_size; @@ -1040,24 +1016,10 @@ public: net::async_connect(socket_, res, f); } - void close() + void disable_reconnect() { - assert(!close_requested_); - close_requested_ = true; - - // If the the message queue is empty we can close the socket - // imediately without shutdown. - if (std::empty(msg_queue_)) { - boost::system::error_code ec; - socket_.close(ec); - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/close/close: {1}." - , id_ - , ec.message()); - } - } + assert(!disable_reconnect_); + disable_reconnect_ = true; } }; diff --git a/examples.cpp b/examples.cpp index 833e90d1..116e1ec7 100644 --- a/examples.cpp +++ b/examples.cpp @@ -15,6 +15,7 @@ void send(std::string cmd) session s {ioc}; s.send(std::move(cmd)); + s.disable_reconnect(); s.run(); ioc.run(); @@ -76,9 +77,17 @@ void rpush_ex() + lrange("h") + rpush("i", i) + lrange("i") + + quit() ; - send(std::move(s)); + net::io_context ioc; + session ss {ioc}; + + ss.send(std::move(s)); + ss.disable_reconnect(); + + ss.run(); + ioc.run(); } void example1() @@ -127,9 +136,17 @@ void example1() + get("h") + auth("password") + bitcount("h") + + quit() ; - send(std::move(s)); + net::io_context ioc; + session ss {ioc}; + + ss.send(std::move(s)); + ss.disable_reconnect(); + + ss.run(); + ioc.run(); } void example2() @@ -146,11 +163,12 @@ void example2() , log::level::info }; - session s {ioc, cfg, "id"}; + session ss {ioc, cfg, "id"}; - s.send(role()); + ss.send(role() + quit()); + ss.disable_reconnect(); - s.run(); + ss.run(); ioc.run(); } @@ -175,7 +193,8 @@ void example3() std::cout << std::endl; }); - s.send(ping()); + s.send(ping() + quit()); + s.disable_reconnect(); s.run(); ioc.run(); @@ -186,7 +205,8 @@ void send_ping() net::io_context ioc; session s {ioc}; - s.send(ping()); + s.send(ping() + quit()); + s.disable_reconnect(); s.run(); ioc.run(); @@ -194,10 +214,10 @@ void send_ping() int main(int argc, char* argv[]) { - //example1(); - //example2(); - //example3(); - //rpush_ex(); + example1(); + example2(); + example3(); + rpush_ex(); send_ping(); } diff --git a/tests.cpp b/tests.cpp index 938a3eef..3096f12d 100644 --- a/tests.cpp +++ b/tests.cpp @@ -113,8 +113,10 @@ void rpush_lrange() check_size(res, 1, prefix); check_string(res.front(), "PONG", prefix); - if (i == size - 1) - ss.close(); + if (i == size - 1) { + ss.send(quit()); + ss.disable_reconnect(); + } } } ++i;