diff --git a/examples/cpp20_chat_room.cpp b/examples/cpp20_chat_room.cpp index 489e5952..24c4a025 100644 --- a/examples/cpp20_chat_room.cpp +++ b/examples/cpp20_chat_room.cpp @@ -19,11 +19,12 @@ namespace net = boost::asio; using stream_descriptor = net::deferred_t::as_default_on_t; -using connection = net::deferred_t::as_default_on_t; using signal_set = net::deferred_t::as_default_on_t; using boost::redis::request; using boost::redis::generic_response; using boost::redis::config; +using boost::redis::connection; +using boost::redis::ignore; using net::redirect_error; using net::use_awaitable; using boost::system::error_code; @@ -41,7 +42,7 @@ receiver(std::shared_ptr conn) -> net::awaitable while (conn->will_reconnect()) { // Subscribe to channels. - co_await conn->async_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); // Loop reading Redis push messages. for (generic_response resp;;) { @@ -66,7 +67,7 @@ auto publisher(std::shared_ptr in, std::shared_ptrasync_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); msg.erase(0, n); } } diff --git a/examples/cpp20_containers.cpp b/examples/cpp20_containers.cpp index ec2ee53b..dfedd82e 100644 --- a/examples/cpp20_containers.cpp +++ b/examples/cpp20_containers.cpp @@ -20,7 +20,7 @@ using boost::redis::response; using boost::redis::ignore_t; using boost::redis::ignore; using boost::redis::config; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; void print(std::map const& cont) { @@ -47,7 +47,7 @@ auto store(std::shared_ptr conn) -> net::awaitable req.push_range("RPUSH", "rpush-key", vec); req.push_range("HSET", "hset-key", map); - co_await conn->async_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); } auto hgetall(std::shared_ptr conn) -> net::awaitable @@ -60,7 +60,7 @@ auto hgetall(std::shared_ptr conn) -> net::awaitable response> resp; // Executes the request and reads the response. - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); print(std::get<0>(resp).value()); } @@ -81,7 +81,7 @@ auto transaction(std::shared_ptr conn) -> net::awaitable response>, std::optional>> // exec > resp; - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); print(std::get<0>(std::get<3>(resp).value()).value().value()); print(std::get<1>(std::get<3>(resp).value()).value().value()); diff --git a/examples/cpp20_echo_server.cpp b/examples/cpp20_echo_server.cpp index 6d2691fb..9b637240 100644 --- a/examples/cpp20_echo_server.cpp +++ b/examples/cpp20_echo_server.cpp @@ -18,11 +18,11 @@ namespace net = boost::asio; using tcp_socket = net::deferred_t::as_default_on_t; using tcp_acceptor = net::deferred_t::as_default_on_t; using signal_set = net::deferred_t::as_default_on_t; -using connection = net::deferred_t::as_default_on_t; using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::system::error_code; +using boost::redis::connection; using namespace std::chrono_literals; auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> net::awaitable @@ -33,7 +33,7 @@ auto echo_server_session(tcp_socket socket, std::shared_ptr conn) -> for (std::string buffer;;) { auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n"); req.push("PING", buffer); - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); co_await net::async_write(socket, net::buffer(std::get<0>(resp).value())); std::get<0>(resp).value().clear(); req.clear(); diff --git a/examples/cpp20_intro.cpp b/examples/cpp20_intro.cpp index 3526950d..195122cf 100644 --- a/examples/cpp20_intro.cpp +++ b/examples/cpp20_intro.cpp @@ -17,8 +17,7 @@ namespace net = boost::asio; using boost::redis::request; using boost::redis::response; using boost::redis::config; -using boost::redis::logger; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; // Called from the main function (see main.cpp) auto co_main(config cfg) -> net::awaitable @@ -34,7 +33,7 @@ auto co_main(config cfg) -> net::awaitable response resp; // Executes the request. - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); std::cout << "PING: " << std::get<0>(resp).value() << std::endl; diff --git a/examples/cpp20_intro_tls.cpp b/examples/cpp20_intro_tls.cpp index 23b491e1..b911af27 100644 --- a/examples/cpp20_intro_tls.cpp +++ b/examples/cpp20_intro_tls.cpp @@ -18,7 +18,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::config; using boost::redis::logger; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; auto verify_certificate(bool, net::ssl::verify_context&) -> bool { @@ -45,7 +45,7 @@ auto co_main(config cfg) -> net::awaitable conn->next_layer().set_verify_mode(net::ssl::verify_peer); conn->next_layer().set_verify_callback(verify_certificate); - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); std::cout << "Response: " << std::get<0>(resp).value() << std::endl; diff --git a/examples/cpp20_json.cpp b/examples/cpp20_json.cpp index 063614b1..d0c6423c 100644 --- a/examples/cpp20_json.cpp +++ b/examples/cpp20_json.cpp @@ -29,7 +29,7 @@ using boost::redis::request; using boost::redis::response; using boost::redis::ignore_t; using boost::redis::config; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; // Struct that will be stored in Redis using json serialization. struct user { @@ -64,7 +64,7 @@ auto co_main(config cfg) -> net::awaitable response resp; - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); // Prints the first ping diff --git a/examples/cpp20_protobuf.cpp b/examples/cpp20_protobuf.cpp index 32e92396..75eb8fd2 100644 --- a/examples/cpp20_protobuf.cpp +++ b/examples/cpp20_protobuf.cpp @@ -25,7 +25,7 @@ using boost::redis::response; using boost::redis::operation; using boost::redis::ignore_t; using boost::redis::config; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; // The protobuf type described in examples/person.proto using tutorial::person; @@ -76,7 +76,7 @@ net::awaitable co_main(config cfg) response resp; // Sends the request and receives the response. - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); conn->cancel(); std::cout diff --git a/examples/cpp20_resolve_with_sentinel.cpp b/examples/cpp20_resolve_with_sentinel.cpp index 35bcf686..8401cd1e 100644 --- a/examples/cpp20_resolve_with_sentinel.cpp +++ b/examples/cpp20_resolve_with_sentinel.cpp @@ -19,7 +19,7 @@ using boost::redis::response; using boost::redis::ignore_t; using boost::redis::config; using boost::redis::address; -using connection = boost::asio::use_awaitable_t<>::as_default_on_t; +using boost::redis::connection; auto redir(boost::system::error_code& ec) { return net::redirect_error(net::use_awaitable, ec); } diff --git a/examples/cpp20_streams.cpp b/examples/cpp20_streams.cpp index 4eca3050..f0939756 100644 --- a/examples/cpp20_streams.cpp +++ b/examples/cpp20_streams.cpp @@ -25,7 +25,7 @@ using boost::redis::config; using boost::redis::generic_response; using boost::redis::operation; using boost::redis::request; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; using signal_set = net::deferred_t::as_default_on_t; auto stream_reader(std::shared_ptr conn) -> net::awaitable @@ -39,7 +39,7 @@ auto stream_reader(std::shared_ptr conn) -> net::awaitable for (;;) { req.push("XREAD", "BLOCK", "0", "STREAMS", "test-topic", stream_id); - co_await conn->async_exec(req, resp); + co_await conn->async_exec(req, resp, net::deferred); // std::cout << "Response: "; // for (int i = 0; i < resp->value().size(); ++i) { diff --git a/examples/cpp20_subscriber.cpp b/examples/cpp20_subscriber.cpp index 1753095f..f7c1a4b3 100644 --- a/examples/cpp20_subscriber.cpp +++ b/examples/cpp20_subscriber.cpp @@ -24,8 +24,9 @@ using boost::redis::request; using boost::redis::generic_response; using boost::redis::logger; using boost::redis::config; +using boost::redis::ignore; using boost::system::error_code; -using connection = net::deferred_t::as_default_on_t; +using boost::redis::connection; using signal_set = net::deferred_t::as_default_on_t; /* This example will subscribe and read pushes indefinitely. @@ -54,7 +55,7 @@ receiver(std::shared_ptr conn) -> net::awaitable while (conn->will_reconnect()) { // Reconnect to channels. - co_await conn->async_exec(req); + co_await conn->async_exec(req, ignore, net::deferred); // Loop reading Redis pushs messages. for (generic_response resp;;) { diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 4ea560de..db226761 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -14,10 +14,10 @@ #include #include #include +#include #include #include -#include namespace boost::redis { namespace detail @@ -187,7 +187,65 @@ private: /** \brief A connection that uses the asio::any_io_executor. * \ingroup high-level-api */ -using connection = basic_connection; +class connection { +public: + /// Executor type. + using executor_type = asio::any_io_executor; + + /// Contructs from an executor. + explicit connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client); + + /// Contructs from a context. + explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client); + + executor_type get_executor() noexcept { return impl_.get_executor(); } + + template + auto async_run(config const& cfg, logger l, CompletionToken token) + { + return asio::async_initiate< + CompletionToken, void(boost::system::error_code)>( + [](auto handler, connection* self, config const* cfg, logger l) + { + self->async_run_impl(*cfg, l, std::move(handler)); + }, token, this, &cfg, l); + } + + template + auto async_receive(Response& response, CompletionToken token) + { + return impl_.async_receive(response, std::move(token)); + } + + template + auto async_exec(request const& req, Response& resp, CompletionToken token) + { + return impl_.async_exec(req, resp, std::move(token)); + } + + void cancel(operation op = operation::all); + + /// Returns true if the connection was canceled. + bool will_reconnect() const noexcept + { return impl_.will_reconnect();} + + /// Returns a reference to the next layer. + auto& next_layer() noexcept { return impl_.next_layer(); } + + /// Returns a const reference to the next layer. + auto const& next_layer() const noexcept { return impl_.next_layer(); } + + void reset_stream() { impl_.reset_stream();} + +private: + void + async_run_impl( + config const& cfg, + logger l, + asio::any_completion_handler token); + + basic_connection impl_; +}; } // boost::redis diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp new file mode 100644 index 00000000..977031a5 --- /dev/null +++ b/include/boost/redis/impl/connection.ipp @@ -0,0 +1,33 @@ +/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include + +namespace boost::redis { + +connection::connection(executor_type ex, asio::ssl::context::method method) +: impl_{ex, method} +{ } + +connection::connection(asio::io_context& ioc, asio::ssl::context::method method) +: impl_(ioc.get_executor(), method) +{ } + +void +connection::async_run_impl( + config const& cfg, + logger l, + asio::any_completion_handler token) +{ + impl_.async_run(cfg, l, std::move(token)); +} + +void connection::cancel(operation op) +{ + impl_.cancel(op); +} + +} // namespace boost::redis diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index ef978afc..3a06c3e0 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include diff --git a/tests/test_issue_50.cpp b/tests/test_issue_50.cpp index 9abab044..dabffd84 100644 --- a/tests/test_issue_50.cpp +++ b/tests/test_issue_50.cpp @@ -30,10 +30,10 @@ using boost::redis::ignore; using boost::redis::logger; using boost::redis::config; using boost::redis::operation; +using boost::redis::connection; using boost::system::error_code; using boost::asio::use_awaitable; using boost::asio::redirect_error; -using connection = boost::asio::use_awaitable_t<>::as_default_on_t; using namespace std::chrono_literals; // Push consumer