2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Make the connection non-generic on the executor type.

This commit is contained in:
Marcelo Zimbres
2023-06-11 11:33:07 +02:00
parent 607946f00e
commit 82430afc8b
14 changed files with 119 additions and 26 deletions

View File

@@ -19,11 +19,12 @@
namespace net = boost::asio;
using stream_descriptor = net::deferred_t::as_default_on_t<net::posix::stream_descriptor>;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::ignore;
using net::redirect_error;
using net::use_awaitable;
using boost::system::error_code;
@@ -41,7 +42,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
// Loop reading Redis push messages.
for (generic_response resp;;) {
@@ -66,7 +67,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto n = co_await net::async_read_until(*in, net::dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
msg.erase(0, n);
}
}

View File

@@ -20,7 +20,7 @@ using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::ignore;
using boost::redis::config;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
void print(std::map<std::string, std::string> const& cont)
{
@@ -47,7 +47,7 @@ auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
}
auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -60,7 +60,7 @@ auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::map<std::string, std::string>> resp;
// Executes the request and reads the response.
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
print(std::get<0>(resp).value());
}
@@ -81,7 +81,7 @@ auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>> // exec
> resp;
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
print(std::get<0>(std::get<3>(resp).value()).value().value());
print(std::get<1>(std::get<3>(resp).value()).value().value());

View File

@@ -18,11 +18,11 @@ namespace net = boost::asio;
using tcp_socket = net::deferred_t::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::deferred_t::as_default_on_t<net::ip::tcp::acceptor>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::system::error_code;
using boost::redis::connection;
using namespace std::chrono_literals;
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -33,7 +33,7 @@ auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) ->
for (std::string buffer;;) {
auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n");
req.push("PING", buffer);
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
co_await net::async_write(socket, net::buffer(std::get<0>(resp).value()));
std::get<0>(resp).value().clear();
req.clear();

View File

@@ -17,8 +17,7 @@ namespace net = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
@@ -34,7 +33,7 @@ auto co_main(config cfg) -> net::awaitable<void>
response<std::string> resp;
// Executes the request.
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
std::cout << "PING: " << std::get<0>(resp).value() << std::endl;

View File

@@ -18,7 +18,7 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
auto verify_certificate(bool, net::ssl::verify_context&) -> bool
{
@@ -45,7 +45,7 @@ auto co_main(config cfg) -> net::awaitable<void>
conn->next_layer().set_verify_mode(net::ssl::verify_peer);
conn->next_layer().set_verify_callback(verify_certificate);
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
std::cout << "Response: " << std::get<0>(resp).value() << std::endl;

View File

@@ -29,7 +29,7 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::config;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
// Struct that will be stored in Redis using json serialization.
struct user {
@@ -64,7 +64,7 @@ auto co_main(config cfg) -> net::awaitable<void>
response<ignore_t, user> resp;
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
// Prints the first ping

View File

@@ -25,7 +25,7 @@ using boost::redis::response;
using boost::redis::operation;
using boost::redis::ignore_t;
using boost::redis::config;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
// The protobuf type described in examples/person.proto
using tutorial::person;
@@ -76,7 +76,7 @@ net::awaitable<void> co_main(config cfg)
response<ignore_t, person> resp;
// Sends the request and receives the response.
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
std::cout

View File

@@ -19,7 +19,7 @@ using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::config;
using boost::redis::address;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }

View File

@@ -25,7 +25,7 @@ using boost::redis::config;
using boost::redis::generic_response;
using boost::redis::operation;
using boost::redis::request;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -39,7 +39,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
for (;;) {
req.push("XREAD", "BLOCK", "0", "STREAMS", "test-topic", stream_id);
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
// std::cout << "Response: ";
// for (int i = 0; i < resp->value().size(); ++i) {

View File

@@ -24,8 +24,9 @@ using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::ignore;
using boost::system::error_code;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
/* This example will subscribe and read pushes indefinitely.
@@ -54,7 +55,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
// Loop reading Redis pushs messages.
for (generic_response resp;;) {

View File

@@ -14,10 +14,10 @@
#include <boost/asio/coroutine.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/any_completion_handler.hpp>
#include <chrono>
#include <memory>
#include <iostream>
namespace boost::redis {
namespace detail
@@ -187,7 +187,65 @@ private:
/** \brief A connection that uses the asio::any_io_executor.
* \ingroup high-level-api
*/
using connection = basic_connection<asio::any_io_executor>;
class connection {
public:
/// Executor type.
using executor_type = asio::any_io_executor;
/// Contructs from an executor.
explicit connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client);
/// Contructs from a context.
explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client);
executor_type get_executor() noexcept { return impl_.get_executor(); }
template <class CompletionToken>
auto async_run(config const& cfg, logger l, CompletionToken token)
{
return asio::async_initiate<
CompletionToken, void(boost::system::error_code)>(
[](auto handler, connection* self, config const* cfg, logger l)
{
self->async_run_impl(*cfg, l, std::move(handler));
}, token, this, &cfg, l);
}
template <class Response, class CompletionToken>
auto async_receive(Response& response, CompletionToken token)
{
return impl_.async_receive(response, std::move(token));
}
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
{
return impl_.async_exec(req, resp, std::move(token));
}
void cancel(operation op = operation::all);
/// Returns true if the connection was canceled.
bool will_reconnect() const noexcept
{ return impl_.will_reconnect();}
/// Returns a reference to the next layer.
auto& next_layer() noexcept { return impl_.next_layer(); }
/// Returns a const reference to the next layer.
auto const& next_layer() const noexcept { return impl_.next_layer(); }
void reset_stream() { impl_.reset_stream();}
private:
void
async_run_impl(
config const& cfg,
logger l,
asio::any_completion_handler<void(boost::system::error_code)> token);
basic_connection<executor_type> impl_;
};
} // boost::redis

View File

@@ -0,0 +1,33 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
namespace boost::redis {
connection::connection(executor_type ex, asio::ssl::context::method method)
: impl_{ex, method}
{ }
connection::connection(asio::io_context& ioc, asio::ssl::context::method method)
: impl_(ioc.get_executor(), method)
{ }
void
connection::async_run_impl(
config const& cfg,
logger l,
asio::any_completion_handler<void(boost::system::error_code)> token)
{
impl_.async_run(cfg, l, std::move(token));
}
void connection::cancel(operation op)
{
impl_.cancel(op);
}
} // namespace boost::redis

View File

@@ -8,6 +8,7 @@
#include <boost/redis/impl/logger.ipp>
#include <boost/redis/impl/request.ipp>
#include <boost/redis/impl/ignore.ipp>
#include <boost/redis/impl/connection.ipp>
#include <boost/redis/resp3/impl/type.ipp>
#include <boost/redis/resp3/impl/parser.ipp>
#include <boost/redis/resp3/impl/serialization.ipp>

View File

@@ -30,10 +30,10 @@ using boost::redis::ignore;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::operation;
using boost::redis::connection;
using boost::system::error_code;
using boost::asio::use_awaitable;
using boost::asio::redirect_error;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using namespace std::chrono_literals;
// Push consumer