diff --git a/README.md b/README.md index 97596f55..c3da94e7 100644 --- a/README.md +++ b/README.md @@ -11,10 +11,10 @@ supported as well. ### Basics -This is the simplest example possible. +Below a basic example ```cpp -awaitable example1() +net::awaitable example1() { auto ex = co_await this_coro::executor; @@ -24,9 +24,6 @@ awaitable example1() tcp_socket socket {ex}; co_await async_connect(socket, r); - std::list list - {"one" ,"two", "three"}; - std::map map { {{"Name"}, {"Marcelo"}} , {{"Education"}, {"Physics"}} @@ -34,15 +31,9 @@ awaitable example1() }; resp::pipeline p; - p.rpush("list", list); - p.lrange("list"); - p.del("list"); p.hset("map", map); p.hincrby("map", "Age", 40); p.hmget("map", {"Name", "Education", "Job"}); - p.hvals("map"); - p.hlen("map"); - p.hgetall("map"); p.quit(); co_await async_write(socket, buffer(p.payload)); diff --git a/examples/example.cpp b/examples/example.cpp index 4d4bdb1a..47b0e0cc 100644 --- a/examples/example.cpp +++ b/examples/example.cpp @@ -27,9 +27,6 @@ net::awaitable example1() tcp_socket socket {ex}; co_await async_connect(socket, r); - std::list list - {"one" ,"two", "three"}; - std::map map { {{"Name"}, {"Marcelo"}} , {{"Education"}, {"Physics"}} @@ -37,15 +34,9 @@ net::awaitable example1() }; resp::pipeline p; - p.rpush("list", list); - p.lrange("list"); - p.del("list"); p.hset("map", map); p.hincrby("map", "Age", 40); p.hmget("map", {"Name", "Education", "Job"}); - p.hvals("map"); - p.hlen("map"); - p.hgetall("map"); p.quit(); co_await async_write(socket, buffer(p.payload)); diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index 88ea883b..6a708c4c 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -520,31 +520,6 @@ public: } -namespace log -{ - -enum class level -{ emerg -, alert -, crit -, err -, warning -, notice -, info -, debug -}; - -template -void write(level filter, level ll, char const* fmt, Args const& ... args) -{ - if (ll > filter) - return; - - //std::clog << std::format(fmt, args...) << std::endl; -} - -} - struct instance { std::string host; std::string port; @@ -701,312 +676,5 @@ auto async_get_instance2( stream); } -template -class session { -public: - using on_conn_handler_type = std::function; - - using msg_handler_type = - std::function)>; - - using sentinel_config = typename sentinel_op2::config; - - struct config { - using sentinel_config = typename sentinel_op2::config; - sentinel_config sentinel; - int max_pipeline_size {256}; - log::level log_filter {log::level::debug}; - }; - -private: - struct queue_item { - std::string payload; - bool sent; - }; - - std::string id_; - config cfg_; - ip::tcp::resolver resolver_; - AsyncReadStream stream_; - - resp::buffer buffer_; - resp::response res_; - std::queue msg_queue_; - int pipeline_size_ = 0; - long long pipeline_id_ = 0; - instance instance_; - bool disable_reconnect_ = false; - - msg_handler_type msg_handler_ = [this](auto ec, auto const& res) - { - if (ec) { - log::write( cfg_.log_filter - , log::level::debug - , "{0}/msg_handler: {1}." - , id_ - , ec.message()); - } - - std::copy( std::cbegin(res) - , std::cend(res) - , std::ostream_iterator(std::cout, " ")); - - std::cout << std::endl; - }; - - - on_conn_handler_type conn_handler_ = [](){}; - - void do_read_resp() - { - res_.clear(); - - auto f = [this](auto const& ec) - { on_resp(ec); }; - - resp::async_read( - stream_, - buffer_, - res_, - std::move(f)); - } - - void - on_instance( std::string const& host - , std::string const& port - , boost::system::error_code ec) - { - buffer_.clear(); - res_.clear(); - - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_instance: {1}. Endpoint: {2}" - , id_ - , ec.message()); - return; - } - - // Closes the connection with the sentinel and connects with the - // master. - close("{0}/on_instance: {1}."); - - // NOTE: Call sync resolve to prevent asio from starting a new - // thread. - ec = {}; - auto res = resolver_.resolve(host, port, ec); - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_instance: {1}." - , id_ - , ec.message()); - return; - } - - do_connect(res); - } - - void do_connect(net::ip::tcp::resolver::results_type res) - { - auto f = [this](auto ec, auto iter) - { on_connect(ec, iter); }; - - net::async_connect(stream_, res, f); - } - - void on_connect( boost::system::error_code ec - , net::ip::tcp::endpoint const& endpoint) - { - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_connect: {1}. Endpoint: {2}" - , id_ - , ec.message() - , endpoint); - - if (!disable_reconnect_) - run(); - - return; - } - - log::write( cfg_.log_filter - , log::level::info - , "{0}/Success connecting to redis instance {1}" - , id_ - , endpoint); - - do_read_resp(); - - // Consumes any messages that have been eventually posted while the - // connection was not established, or consumes msgs when a - // connection to redis is restablished. - if (!std::empty(msg_queue_)) { - log::write( cfg_.log_filter - , log::level::debug - , "{0}/on_connect: Number of messages {1}" - , id_ - , std::size(msg_queue_)); - do_write(); - } - - // Calls user callback to inform a successfull connect to redis. - // It may wish to start sending some commands. - // - // Since this callback may call the send member function on this - // object, we have to call it AFTER the write operation above, - // otherwise the message will be sent twice. - conn_handler_(); - } - - void on_resp(boost::system::error_code ec) - { - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_resp: {1}." - , id_ - , ec.message()); - - // Some of the possible errors are. - // net::error::eof - // net::error::connection_reset - // net::error::operation_aborted - - close("{0}/on_resp: {1}."); - if (!disable_reconnect_) - run(); - - return; - } - - msg_handler_(ec, std::move(res_.res)); - - do_read_resp(); - - if (std::empty(msg_queue_)) - return; - - // In practive, the if condition below will always hold as we pop the - // last written message as soon as the first response from a pipeline - // is received and send the next. If think the code is clearer this - // way. - if (msg_queue_.front().sent) { - msg_queue_.pop(); - - if (std::empty(msg_queue_)) - return; - - do_write(); - } - } - - void on_write(boost::system::error_code ec, std::size_t n) - { - if (ec) { - log::write( cfg_.log_filter - , log::level::info - , "{0}/on_write: {1}." - , id_, ec.message()); - return; - } - } - - void do_write() - { - auto f = [this](auto ec, auto n) - { on_write(ec, n); }; - - assert(!std::empty(msg_queue_)); - assert(!std::empty(msg_queue_.front().payload)); - - net::async_write( stream_ - , net::buffer(msg_queue_.front().payload) - , f); - msg_queue_.front().sent = true; - } - - void close(char const* msg) - { - boost::system::error_code ec; - stream_.close(ec); - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , msg - , id_ - , ec.message()); - } - } - -public: - session( net::io_context& ioc - , config cfg - , std::string id = "aedis") - : id_(id) - , cfg_ {std::move(cfg)} - , resolver_ {ioc} - , stream_ {ioc} - { - if (cfg_.max_pipeline_size < 1) - cfg_.max_pipeline_size = 1; - } - - session(net::io_context& ioc) : session {ioc, {}, {}} { } - - void set_on_conn_handler(on_conn_handler_type f) - { conn_handler_ = std::move(f);}; - - void set_msg_handler(msg_handler_type f) - { msg_handler_ = std::move(f);}; - - auto send(std::string msg) - { - assert(!std::empty(msg)); - - auto const max_pp_size_reached = - pipeline_size_ >= cfg_.max_pipeline_size; - - if (max_pp_size_reached) - pipeline_size_ = 0; - - auto const is_empty = std::empty(msg_queue_); - - // When std::size(msg_queue_) == 1 we know the message in the back of - // queue has already been sent and we are waiting for a reponse, we - // cannot pipeline in this case. - if (is_empty || std::size(msg_queue_) == 1 || max_pp_size_reached) { - msg_queue_.push({std::move(msg), false}); - ++pipeline_id_; - } else { - msg_queue_.back().payload += msg; // Uses pipeline. - ++pipeline_size_; - } - - if (is_empty && stream_.is_open()) - do_write(); - - return pipeline_id_; - } - - void run() - { - auto f = [this](auto ec) - { on_instance(instance_.host, instance_.port, ec); }; - - async_get_instance2(stream_, cfg_.sentinel, instance_, f); - } - - void disable_reconnect() - { - assert(!disable_reconnect_); - disable_reconnect_ = true; - } -}; - } diff --git a/tests/general.cpp b/tests/general.cpp index 9a84370b..efcded1d 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -130,18 +130,6 @@ struct test_handler { } }; -void send(std::string cmd) -{ - net::io_context ioc; - session s {ioc}; - - s.send(std::move(cmd)); - s.disable_reconnect(); - - s.run(); - ioc.run(); -} - net::awaitable offline() { // Redis answer - Expected vector. @@ -167,7 +155,6 @@ net::awaitable offline() int main(int argc, char* argv[]) { - //send(ping()); net::io_context ioc {1}; co_spawn(ioc, offline(), net::detached); co_spawn(ioc, test1(), net::detached);