diff --git a/CMakeLists.txt b/CMakeLists.txt index c97e0213..2527f70b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,7 +21,7 @@ add_library(aedis INTERFACE) add_library(aedis::aedis ALIAS aedis) target_link_libraries(aedis INTERFACE fmt::fmt Boost::system) -target_compile_features(aedis INTERFACE cxx_std_17) +target_compile_features(aedis INTERFACE cxx_std_20) target_include_directories(aedis INTERFACE $ diff --git a/aedis.hpp b/aedis.hpp index 0713cb75..a36319b5 100644 --- a/aedis.hpp +++ b/aedis.hpp @@ -659,6 +659,14 @@ struct instance { std::string name; }; +inline +auto operator==(instance const& a, instance const& b) +{ + return a.host == b.host + && a.port == b.port + && a.name == b.name; +} + // Still on development. template class sentinel_op2 { @@ -678,7 +686,7 @@ private: }; struct impl { - AsyncReadStream stream; + AsyncReadStream& stream; resp::buffer buffer; resp::response res; instance* inst; @@ -687,8 +695,10 @@ private: config cfg; ip::tcp::resolver resv; - impl(config c, instance* i) - : inst(i) + impl(AsyncReadStream& s, config c, instance* i) + : stream(s) + , resv(s.get_executor()) + , inst(i) , cfg(c) {} }; @@ -696,8 +706,8 @@ private: std::shared_ptr impl_; public: - sentinel_op2(config cfg, instance* inst) - : impl_(std::make_shared(cfg, inst)) + sentinel_op2(AsyncReadStream& stream, config cfg, instance* inst) + : impl_(std::make_shared(stream, cfg, inst)) { auto const n = std::size(cfg.sentinels); if (n == 0 || (n % 2 != 0)) @@ -713,8 +723,8 @@ public: case op_state::on_connect: { auto const n = std::size(impl_->cfg.sentinels) / 2; - unsigned i = 0; + unsigned i = 0; for (i = 0; i < n; ++i) { auto const res = impl_->resv .resolve( impl_->cfg.sentinels[2 * i + 0] @@ -742,6 +752,7 @@ public: // std::swap(cfg.sentinels[0], cfg.sentinels[2 * i + 0]); // std::swap(cfg.sentinels[1], cfg.sentinels[2 * i + 1]); + impl_->inst->name = impl_->cfg.name; impl_->cmd = sentinel("get-master-addr-by-name", impl_->inst->name); impl_->opstate = op_state::on_write; net::async_write( impl_->stream @@ -775,101 +786,21 @@ public: } } }; -template -class sentinel_op { -public: - struct config { - // A list of redis sentinels e.g. ip1 port1 ip2 port2 ... - std::vector sentinels {"127.0.0.1", "26379"}; - std::string name {"mymaster"}; - std::string role {"master"}; - }; -private: - enum class op_state - { starting - , writing - , waiting - }; - - struct impl { - AsyncReadStream& stream; - resp::buffer buffer; - resp::response res; - instance* inst; - op_state opstate {op_state::starting}; - std::string cmd; - - impl(AsyncReadStream& s, instance* i) - : stream {s} - , inst {i} - {} - }; - - std::shared_ptr impl_; - -public: - sentinel_op( - AsyncReadStream& stream, - instance* inst) - : impl_(std::make_shared(stream, inst)) - { - } - - template - void operator()( Self& self - , boost::system::error_code ec = {} - , std::size_t n = 0) - { - switch (impl_->opstate) { - case op_state::starting: - { - impl_->cmd = sentinel("get-master-addr-by-name", impl_->inst->name); - impl_->opstate = op_state::writing; - net::async_write( impl_->stream - , net::buffer(impl_->cmd) - , std::move(self)); - } break; - case op_state::writing: - { - if (ec) - return self.complete(ec); - - impl_->opstate = op_state::waiting; - - resp::async_read( - impl_->stream, - impl_->buffer, - impl_->res, - std::move(self)); - - } break; - case op_state::waiting: - { - auto n = std::size(impl_->res.res); - if (n > 1) { - impl_->inst->host = impl_->res.res[0]; - impl_->inst->port = impl_->res.res[1]; - } - self.complete(ec); - } break; - default: - { - } - } - } -}; - -template -auto -async_get_instance( AsyncReadStream& s - , instance* p2 - , CompletionToken&& token) +template +auto async_get_instance2( + AsyncReadStream& stream, + typename sentinel_op2::config cfg, + instance& inst, + CompletionToken&& token) { return net::async_compose < CompletionToken , void(boost::system::error_code) - >(sentinel_op {s, p2}, token, s); + >(sentinel_op2 {stream, cfg, &inst}, + token, + stream); } template @@ -881,10 +812,10 @@ public: std::function)>; - using sentinel_config = typename sentinel_op::config; + using sentinel_config = typename sentinel_op2::config; struct config { - using sentinel_config = typename sentinel_op::config; + using sentinel_config = typename sentinel_op2::config; sentinel_config sentinel; int max_pipeline_size {256}; log::level log_filter {log::level::debug}; @@ -901,14 +832,12 @@ private: ip::tcp::resolver resolver_; AsyncReadStream stream_; - net::steady_timer timer_; resp::buffer buffer_; resp::response res_; std::queue msg_queue_; int pipeline_size_ = 0; long long pipeline_id_ = 0; instance instance_; - int sentinel_idx_ = 0; bool disable_reconnect_ = false; msg_handler_type msg_handler_ = [this](auto ec, auto const& res) @@ -1116,56 +1045,6 @@ private: } } - void - on_sentinel_conn( boost::system::error_code ec - , net::ip::tcp::endpoint const& endpoint) - { - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_sentinel_conn: {1}. Endpoint: {2}" - , id_ - , ec.message() - , endpoint); - - // Ask the next sentinel only if we did not try them all yet. - ++sentinel_idx_; - if ((2 * sentinel_idx_) == std::size(cfg_.sentinel.sentinels)) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/No sentinel knows the redis instance address." - , id_); - - return; - } - - run(); // Tries the next sentinel. - return; - } - - // The redis documentation recommends to put the first sentinel that - // replies in the start of the list. See - // https://redis.io/topics/sentinel-clients - if (sentinel_idx_ != 0) { - auto const r = sentinel_idx_; - std::swap(cfg_.sentinel.sentinels[0], cfg_.sentinel.sentinels[2 * r]); - std::swap(cfg_.sentinel.sentinels[1], cfg_.sentinel.sentinels[2 * r + 1]); - sentinel_idx_ = 0; - } - - log::write( cfg_.log_filter - , log::level::info - , "{0}/Success connecting to sentinel {1}" - , id_ - , endpoint); - - auto g = [this](auto ec) - { on_instance(instance_.host, instance_.port, ec); }; - - instance_.name = cfg_.sentinel.name; - async_get_instance(stream_, &instance_, std::move(g)); - } - public: session( net::io_context& ioc , config cfg @@ -1174,7 +1053,6 @@ public: , cfg_ {std::move(cfg)} , resolver_ {ioc} , stream_ {ioc} - , timer_ {ioc, std::chrono::steady_clock::time_point::max()} { if (cfg_.max_pipeline_size < 1) cfg_.max_pipeline_size = 1; @@ -1219,37 +1097,10 @@ public: void run() { - auto const n = std::size(cfg_.sentinel.sentinels); + auto f = [this](auto ec) + { on_instance(instance_.host, instance_.port, ec); }; - if (n == 0 || (n % 2 != 0)) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/run: Incompatible sentinels array size: {1}" - , id_, n); - return; - } - - auto const r = sentinel_idx_; - - boost::system::error_code ec; - auto res = resolver_ - .resolve( cfg_.sentinel.sentinels[2 * r] - , cfg_.sentinel.sentinels[2 * r + 1] - , ec); - - if (ec) { - log::write( cfg_.log_filter - , log::level::warning - , "{0}/run: Can't resolve sentinel: {1}." - , id_ - , ec.message()); - return; - } - - auto f = [this](auto ec, auto iter) - { on_sentinel_conn(ec, iter); }; - - net::async_connect(stream_, res, f); + async_get_instance2(stream_, cfg_.sentinel, instance_, f); } void disable_reconnect() diff --git a/examples.cpp b/examples.cpp index c36d6738..4e361792 100644 --- a/examples.cpp +++ b/examples.cpp @@ -140,6 +140,20 @@ awaitable example4(tcp::resolver::results_type const& r) } } +awaitable example5() +{ + tcp_socket socket {co_await this_coro::executor}; + + sentinel_op2::config cfg + { {"127.0.0.1", "26379"} + , {"mymaster"} + , {"master"} + }; + + instance inst; + co_await async_get_instance2(socket, cfg, inst); +} + int main() { io_context ioc {1}; @@ -150,6 +164,7 @@ int main() co_spawn(ioc, example2(r), detached); co_spawn(ioc, example3(r), detached); co_spawn(ioc, example4(r), detached); + co_spawn(ioc, example5(), detached); ioc.run(); } diff --git a/tests.cpp b/tests.cpp index fc41d1f3..f8e610b3 100644 --- a/tests.cpp +++ b/tests.cpp @@ -207,10 +207,33 @@ void send(std::string cmd) ioc.run(); } -int main(int argc, char* argv[]) +void test5() { - rpush_lrange(); + net::io_context ioc {1}; + tcp::socket socket {ioc}; + sentinel_op2::config cfg + { {"127.0.0.1", "26377", "127.0.0.1", "26378", "127.0.0.1", "26379"} + , {"mymaster"} + , {"master"} + }; + + instance inst; + auto f = [&](auto ec) + { + instance expected {"127.0.0.1", "6379", "mymaster"}; + if (inst == expected) + std::cout << "Success: async_get_instance2" << std::endl; + else + std::cout << "Error: async_get_instance2" << std::endl; + }; + + async_get_instance2(socket, cfg, inst, f); + ioc.run(); +} + +int offline() +{ // Redis answer - Expected vector. std::vector>> payloads { {{"+OK\r\n"}, {"OK"}} @@ -230,7 +253,13 @@ int main(int argc, char* argv[]) else std::cout << "Success: Offline tests." << std::endl; } +} +int main(int argc, char* argv[]) +{ + //rpush_lrange(); + //test5(); + //offline(); send(ping()); }