From 540ac64fdd123652a88a475a3d705a1589ea5d84 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sun, 25 Oct 2020 21:30:42 +0100 Subject: [PATCH] Makes the aedis session agnostiv in regard to the tcp stream. --- aedis.hpp | 90 ++++++++++++++++++++++++++++++++++------------------ examples.cpp | 16 +++++----- tests.cpp | 4 +-- 3 files changed, 69 insertions(+), 41 deletions(-) diff --git a/aedis.hpp b/aedis.hpp index 2d48441e..94f67533 100644 --- a/aedis.hpp +++ b/aedis.hpp @@ -154,8 +154,9 @@ std::size_t get_length(char const* p) // The parser supports up to 5 levels of nested structures. The first // element in the sizes stack is a sentinel and must be different from // 1. +template struct read_op { - tcp::socket& socket; + AsyncReadStream& socket; resp::buffer* buffer_ = nullptr; int start = 1; int depth = 0; @@ -232,13 +233,20 @@ struct read_op { } }; -template -auto async_read(tcp::socket& s, resp::buffer* buffer, CompletionToken&& token) +template < + class AsyncReadStream, + class CompletionToken> +auto async_read( + AsyncReadStream& stream, + resp::buffer* buffer, + CompletionToken&& token) { return net::async_compose < CompletionToken , void(boost::system::error_code) - >(read_op {s, buffer}, token, s); + >(read_op {stream, buffer}, + token, + stream); } } @@ -618,47 +626,67 @@ struct instance { std::string name; }; -struct instance_op { - enum class states +template +class sentinel_op { +private: + enum class op_state { starting , writing - , waiting }; + , waiting + }; - tcp::socket& socket_; - resp::buffer* buffer_; - instance* instance_; - states state_ {states::starting}; - std::string cmd_; + struct impl { + AsyncReadStream& stream; + resp::buffer buffer; + instance* inst; + op_state opstate {op_state::starting}; + std::string cmd; + + impl(AsyncReadStream& s, instance* i) + : stream {s} + , inst {i} + {} + }; + + std::shared_ptr impl_; + +public: + sentinel_op( + AsyncReadStream& stream, + instance* inst) + : impl_(std::make_shared(stream, inst)) + { + } template void operator()( Self& self , boost::system::error_code ec = {} , std::size_t n = 0) { - switch (state_) { - case states::starting: + switch (impl_->opstate) { + case op_state::starting: { - cmd_ = sentinel("get-master-addr-by-name", instance_->name); - state_ = states::writing; - net::async_write( socket_ - , net::buffer(cmd_) + impl_->cmd = sentinel("get-master-addr-by-name", impl_->inst->name); + impl_->opstate = op_state::writing; + net::async_write( impl_->stream + , net::buffer(impl_->cmd) , std::move(self)); } break; - case states::writing: + case op_state::writing: { if (ec) return self.complete(ec); - state_ = states::waiting; - resp::async_read(socket_, buffer_, std::move(self)); + impl_->opstate = op_state::waiting; + resp::async_read(impl_->stream, &impl_->buffer, std::move(self)); } break; - case states::waiting: + case op_state::waiting: { - auto n = std::size(buffer_->res); + auto n = std::size(impl_->buffer.res); if (n > 1) { - instance_->host = buffer_->res[0]; - instance_->port = buffer_->res[1]; + impl_->inst->host = impl_->buffer.res[0]; + impl_->inst->port = impl_->buffer.res[1]; } self.complete(ec); } break; @@ -669,19 +697,19 @@ struct instance_op { } }; -template +template auto -async_get_instance( tcp::socket& s - , resp::buffer* p +async_get_instance( AsyncReadStream& s , instance* p2 , CompletionToken&& token) { return net::async_compose < CompletionToken , void(boost::system::error_code) - >(instance_op {s, p, p2}, token, s); + >(sentinel_op {s, p2}, token, s); } +template class session { public: using on_conn_handler_type = std::function; @@ -708,7 +736,7 @@ private: std::string id_; config cfg_; ip::tcp::resolver resolver_; - tcp::socket socket_; + AsyncReadStream socket_; net::steady_timer timer_; resp::buffer buffer_; @@ -966,7 +994,7 @@ private: { on_instance(instance_.host, instance_.port, ec); }; instance_.name = cfg_.name; - async_get_instance(socket_, &buffer_, &instance_, std::move(g)); + async_get_instance(socket_, &instance_, std::move(g)); } public: diff --git a/examples.cpp b/examples.cpp index 43c80a18..f42c889f 100644 --- a/examples.cpp +++ b/examples.cpp @@ -12,7 +12,7 @@ using namespace aedis; void send(std::string cmd) { net::io_context ioc; - session s {ioc}; + session s {ioc}; s.send(std::move(cmd)); s.disable_reconnect(); @@ -81,7 +81,7 @@ void rpush_ex() ; net::io_context ioc; - session ss {ioc}; + session ss {ioc}; ss.send(std::move(s)); ss.disable_reconnect(); @@ -144,7 +144,7 @@ void example1() ; net::io_context ioc; - session ss {ioc}; + session ss {ioc}; ss.send(std::move(s)); ss.disable_reconnect(); @@ -157,7 +157,7 @@ void example2() { net::io_context ioc; - session::config cfg + session::config cfg { { "127.0.0.1", "26377" , "127.0.0.1", "26378" , "127.0.0.1", "26379"} // Sentinel addresses @@ -167,7 +167,7 @@ void example2() , log::level::info }; - session ss {ioc, cfg, "id"}; + session ss {ioc, cfg, "id"}; ss.send(role() + quit()); ss.disable_reconnect(); @@ -179,7 +179,7 @@ void example2() void example3() { net::io_context ioc; - session s {ioc}; + session s {ioc}; s.set_on_conn_handler([]() { std::cout << "Connected" << std::endl; @@ -207,7 +207,7 @@ void example3() void send_ping() { net::io_context ioc; - session s {ioc}; + session s {ioc}; s.send(ping() + quit()); s.disable_reconnect(); @@ -219,7 +219,7 @@ void send_ping() void psubscribe_ex() { net::io_context ioc; - session s {ioc}; + session s {ioc}; s.send(psubscribe({"__keyevent@0__:rpush"})); s.disable_reconnect(); diff --git a/tests.cpp b/tests.cpp index 3096f12d..ddae8833 100644 --- a/tests.cpp +++ b/tests.cpp @@ -54,7 +54,7 @@ void check_equal( void rpush_lrange() { - session::config cfg + session::config cfg { {"127.0.0.1", "26379"} , "mymaster" , "master" @@ -63,7 +63,7 @@ void rpush_lrange() }; net::io_context ioc; - session ss {ioc, cfg}; + session ss {ioc, cfg}; std::array a {"a1", "a2", "a3"};