mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Makes connection_base a private base.
This commit is contained in:
@@ -31,7 +31,7 @@ namespace aedis {
|
||||
*/
|
||||
template <class AsyncReadWriteStream = boost::asio::ip::tcp::socket>
|
||||
class connection :
|
||||
public connection_base<
|
||||
private connection_base<
|
||||
typename AsyncReadWriteStream::executor_type,
|
||||
connection<AsyncReadWriteStream>> {
|
||||
public:
|
||||
@@ -40,6 +40,10 @@ public:
|
||||
|
||||
/// Type of the next layer
|
||||
using next_layer_type = AsyncReadWriteStream;
|
||||
using base_type = connection_base<executor_type, connection<AsyncReadWriteStream>>;
|
||||
|
||||
/// List of operations that can be canceled.
|
||||
using operation = typename base_type::operation;
|
||||
|
||||
/** @brief Connection configuration parameters.
|
||||
*/
|
||||
@@ -67,6 +71,9 @@ public:
|
||||
: connection(ioc.get_executor())
|
||||
{ }
|
||||
|
||||
/// Returns the associated executor.
|
||||
auto get_executor() {return stream_.get_executor();}
|
||||
|
||||
/// Resets the underlying stream.
|
||||
void reset_stream()
|
||||
{
|
||||
@@ -174,28 +181,117 @@ public:
|
||||
return base_type::async_run(ep, req, adapter, ts, std::move(token));
|
||||
}
|
||||
|
||||
/** @brief Executes a command on the Redis server asynchronously.
|
||||
*
|
||||
* This function will send a request to the Redis server and
|
||||
* complete when the response arrives. If the request contains
|
||||
* only commands that don't expect a response, the completion
|
||||
* occurs after it has been written to the underlying stream.
|
||||
* Multiple concurrent calls to this function will be
|
||||
* automatically queued by the implementation.
|
||||
*
|
||||
* @param req Request object.
|
||||
* @param adapter Response adapter.
|
||||
* @param token Asio completion token.
|
||||
*
|
||||
* For an example see echo_server.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(boost::system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the response in
|
||||
* bytes.
|
||||
*/
|
||||
template <
|
||||
class Adapter = detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_exec(
|
||||
resp3::request const& req,
|
||||
Adapter adapter = adapt(),
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return base_type::async_exec(req, adapter, std::move(token));
|
||||
}
|
||||
|
||||
/** @brief Receives server side pushes asynchronously.
|
||||
*
|
||||
* Users that expect server pushes should call this function in a
|
||||
* loop. If a push arrives and there is no reader, the connection
|
||||
* will hang and eventually timeout.
|
||||
*
|
||||
* @param adapter The response adapter.
|
||||
* @param token The Asio completion token.
|
||||
*
|
||||
* For an example see subscriber.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(boost::system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the push in
|
||||
* bytes.
|
||||
*/
|
||||
template <
|
||||
class Adapter = detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_receive_push(
|
||||
Adapter adapter = adapt(),
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return base_type::async_receive_push(adapter, std::move(token));
|
||||
}
|
||||
|
||||
/** @brief Cancel operations.
|
||||
*
|
||||
* @li `operation::exec`: Cancels operations started with
|
||||
* `async_exec`. Has precedence over
|
||||
* `request::config::close_on_connection_lost`
|
||||
* @li operation::run: Cancels the `async_run` operation. Notice
|
||||
* that the preferred way to close a connection is to send a
|
||||
* [QUIT](https://redis.io/commands/quit/) command to the server.
|
||||
* An unresponsive Redis server will also cause the idle-checks to
|
||||
* timeout and lead to `connection::async_run` completing with
|
||||
* `error::idle_timeout`. Calling `cancel(operation::run)`
|
||||
* directly should be seen as the last option.
|
||||
* @li operation::receive_push: Cancels any ongoing callto
|
||||
* `async_receive_push`.
|
||||
*
|
||||
* @param op: The operation to be cancelled.
|
||||
* @returns The number of operations that have been canceled.
|
||||
*/
|
||||
auto cancel(operation op) -> std::size_t
|
||||
{ return base_type::cancel(op); }
|
||||
|
||||
private:
|
||||
using base_type = connection_base<executor_type, connection<AsyncReadWriteStream>>;
|
||||
using this_type = connection<next_layer_type>;
|
||||
|
||||
template <class, class> friend class connection_base;
|
||||
template <class, class> friend struct detail::exec_read_op;
|
||||
template <class, class> friend struct detail::exec_op;
|
||||
template <class, class> friend struct detail::receive_push_op;
|
||||
template <class> friend struct detail::ping_op;
|
||||
template <class> friend struct detail::check_idle_op;
|
||||
template <class> friend struct detail::reader_op;
|
||||
template <class> friend struct detail::writer_op;
|
||||
template <class> friend struct detail::connect_with_timeout_op;
|
||||
template <class, class> friend struct detail::connect_with_timeout_op;
|
||||
template <class> friend struct detail::run_op;
|
||||
template <class> friend struct aedis::detail::ping_op;
|
||||
|
||||
template <class CompletionToken>
|
||||
auto async_connect(timeouts ts, CompletionToken&& token)
|
||||
template <class Timer, class CompletionToken>
|
||||
auto
|
||||
async_connect(
|
||||
boost::asio::ip::tcp::resolver::results_type const& endpoints,
|
||||
timeouts ts,
|
||||
Timer& timer,
|
||||
CompletionToken&& token)
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::connect_with_timeout_op<this_type>{this, ts}, token, stream_);
|
||||
>(detail::connect_with_timeout_op<this_type, Timer>{this, &endpoints, ts, &timer},
|
||||
token, stream_);
|
||||
}
|
||||
|
||||
void close() { stream_.close(); }
|
||||
|
||||
@@ -27,8 +27,7 @@
|
||||
|
||||
namespace aedis {
|
||||
|
||||
/** @brief Base class for high level Redis asynchronous connections.
|
||||
* @ingroup any
|
||||
/** Base class for high level Redis asynchronous connections.
|
||||
*
|
||||
* This class is not meant to be instantiated directly but as base
|
||||
* class in the CRTP.
|
||||
@@ -40,8 +39,8 @@ namespace aedis {
|
||||
template <class Executor, class Derived>
|
||||
class connection_base {
|
||||
public:
|
||||
/// Executor type.
|
||||
using executor_type = Executor;
|
||||
using this_type = connection_base<Executor, Derived>;
|
||||
|
||||
/** @brief List of async operations exposed by this class.
|
||||
*
|
||||
@@ -57,10 +56,6 @@ public:
|
||||
receive_push,
|
||||
};
|
||||
|
||||
/** @brief Constructor
|
||||
*
|
||||
* @param ex The executor.
|
||||
*/
|
||||
explicit connection_base(executor_type ex)
|
||||
: resv_{ex}
|
||||
, ping_timer_{ex}
|
||||
@@ -75,27 +70,8 @@ public:
|
||||
read_timer_.expires_at(std::chrono::steady_clock::time_point::max());
|
||||
}
|
||||
|
||||
/// Returns the executor.
|
||||
auto get_executor() {return resv_.get_executor();}
|
||||
|
||||
/** @brief Cancel operations.
|
||||
*
|
||||
* @li `operation::exec`: Cancels operations started with
|
||||
* `async_exec`. Has precedence over
|
||||
* `request::config::close_on_connection_lost`
|
||||
* @li operation::run: Cancels the `async_run` operation. Notice
|
||||
* that the preferred way to close a connection is to send a
|
||||
* [QUIT](https://redis.io/commands/quit/) command to the server.
|
||||
* An unresponsive Redis server will also cause the idle-checks to
|
||||
* timeout and lead to `connection::async_run` completing with
|
||||
* `error::idle_timeout`. Calling `cancel(operation::run)`
|
||||
* directly should be seen as the last option.
|
||||
* @li operation::receive_push: Cancels any ongoing callto
|
||||
* `async_receive_push`.
|
||||
*
|
||||
* @param op: The operation to be cancelled.
|
||||
* @returns The number of operations that have been canceled.
|
||||
*/
|
||||
auto cancel(operation op) -> std::size_t
|
||||
{
|
||||
switch (op) {
|
||||
@@ -142,29 +118,6 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
/** @brief Executes a command on the Redis server asynchronously.
|
||||
*
|
||||
* This function will send a request to the Redis server and
|
||||
* complete when the response arrives. If the request contains
|
||||
* only commands that don't expect a response, the completion
|
||||
* occurs after it has been written to the underlying stream.
|
||||
* Multiple concurrent calls to this function will be
|
||||
* automatically queued by the implementation.
|
||||
*
|
||||
* @param req Request object.
|
||||
* @param adapter Response adapter.
|
||||
* @param token Asio completion token.
|
||||
*
|
||||
* For an example see echo_server.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(boost::system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the response in
|
||||
* bytes.
|
||||
*/
|
||||
template <
|
||||
class Adapter = detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
@@ -181,25 +134,6 @@ public:
|
||||
>(detail::exec_op<Derived, Adapter>{&derived(), &req, adapter}, token, resv_);
|
||||
}
|
||||
|
||||
/** @brief Receives server side pushes asynchronously.
|
||||
*
|
||||
* Users that expect server pushes should call this function in a
|
||||
* loop. If a push arrives and there is no reader, the connection
|
||||
* will hang and eventually timeout.
|
||||
*
|
||||
* @param adapter The response adapter.
|
||||
* @param token The Asio completion token.
|
||||
*
|
||||
* For an example see subscriber.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(boost::system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the push in
|
||||
* bytes.
|
||||
*/
|
||||
template <
|
||||
class Adapter = detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
@@ -214,7 +148,6 @@ public:
|
||||
>(detail::receive_push_op<Derived, decltype(f)>{&derived(), f}, token, resv_);
|
||||
}
|
||||
|
||||
protected:
|
||||
template <class Timeouts, class CompletionToken>
|
||||
auto
|
||||
async_run(endpoint ep, Timeouts ts, CompletionToken token)
|
||||
@@ -307,7 +240,7 @@ private:
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::resolve_with_timeout_op<Derived>{&derived(), d},
|
||||
>(detail::resolve_with_timeout_op<this_type>{this, d},
|
||||
token, resv_);
|
||||
}
|
||||
|
||||
@@ -337,7 +270,7 @@ private:
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::start_op<Derived, Timeouts>{&derived(), ts}, token, resv_);
|
||||
>(detail::start_op<this_type, Timeouts>{this, ts}, token, resv_);
|
||||
}
|
||||
|
||||
template <class CompletionToken>
|
||||
@@ -428,13 +361,7 @@ private:
|
||||
|
||||
// IO objects
|
||||
resolver_type resv_;
|
||||
protected:
|
||||
timer_type ping_timer_;
|
||||
endpoint ep_;
|
||||
// The result of async_resolve.
|
||||
boost::asio::ip::tcp::resolver::results_type endpoints_;
|
||||
|
||||
private:
|
||||
timer_type check_idle_timer_;
|
||||
timer_type writer_timer_;
|
||||
timer_type read_timer_;
|
||||
@@ -450,6 +377,9 @@ private:
|
||||
|
||||
resp3::request req_;
|
||||
std::vector<resp3::node<std::string>> response_;
|
||||
endpoint ep_;
|
||||
// The result of async_resolve.
|
||||
boost::asio::ip::tcp::resolver::results_type endpoints_;
|
||||
};
|
||||
|
||||
} // aedis
|
||||
|
||||
@@ -30,10 +30,12 @@
|
||||
|
||||
namespace aedis::detail {
|
||||
|
||||
template <class Conn>
|
||||
template <class Conn, class Timer>
|
||||
struct connect_with_timeout_op {
|
||||
Conn* conn = nullptr;
|
||||
boost::asio::ip::tcp::resolver::results_type const* endpoints = nullptr;
|
||||
typename Conn::timeouts ts;
|
||||
Timer* timer = nullptr;
|
||||
boost::asio::coroutine coro{};
|
||||
|
||||
template <class Self>
|
||||
@@ -43,10 +45,10 @@ struct connect_with_timeout_op {
|
||||
{
|
||||
reenter (coro)
|
||||
{
|
||||
conn->ping_timer_.expires_after(ts.connect_timeout);
|
||||
timer->expires_after(ts.connect_timeout);
|
||||
yield
|
||||
detail::async_connect(
|
||||
conn->next_layer(), conn->ping_timer_, conn->endpoints_, std::move(self));
|
||||
conn->next_layer(), *timer, *endpoints, std::move(self));
|
||||
self.complete(ec);
|
||||
}
|
||||
}
|
||||
@@ -425,7 +427,7 @@ struct run_op {
|
||||
}
|
||||
|
||||
yield
|
||||
conn->derived().async_connect(ts, std::move(self));
|
||||
conn->derived().async_connect(conn->endpoints_, ts, conn->ping_timer_, std::move(self));
|
||||
if (ec) {
|
||||
conn->cancel(Conn::operation::run);
|
||||
self.complete(ec);
|
||||
|
||||
@@ -35,7 +35,7 @@ class connection;
|
||||
*/
|
||||
template <class AsyncReadWriteStream>
|
||||
class connection<boost::asio::ssl::stream<AsyncReadWriteStream>> :
|
||||
public connection_base<
|
||||
private connection_base<
|
||||
typename boost::asio::ssl::stream<AsyncReadWriteStream>::executor_type,
|
||||
connection<boost::asio::ssl::stream<AsyncReadWriteStream>>> {
|
||||
public:
|
||||
@@ -44,6 +44,10 @@ public:
|
||||
|
||||
/// Executor type.
|
||||
using executor_type = typename next_layer_type::executor_type;
|
||||
using base_type = connection_base<executor_type, connection<boost::asio::ssl::stream<AsyncReadWriteStream>>>;
|
||||
|
||||
/// List of operations that can be canceled.
|
||||
using operation = typename base_type::operation;
|
||||
|
||||
/** @brief Connection configuration parameters.
|
||||
*/
|
||||
@@ -76,6 +80,9 @@ public:
|
||||
: connection(ioc.get_executor(), ctx)
|
||||
{ }
|
||||
|
||||
/// Returns the associated executor.
|
||||
auto get_executor() {return stream_.get_executor();}
|
||||
|
||||
/// Reset the underlying stream.
|
||||
void reset_stream(boost::asio::ssl::context& ctx)
|
||||
{
|
||||
@@ -119,30 +126,71 @@ public:
|
||||
return base_type::async_run(ep, req, adapter, ts, std::move(token));
|
||||
}
|
||||
|
||||
/** @brief Executes a command on the Redis server asynchronously.
|
||||
*
|
||||
* See aedis::connection::async_exec for detailed information.
|
||||
*/
|
||||
template <
|
||||
class Adapter = aedis::detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_exec(
|
||||
resp3::request const& req,
|
||||
Adapter adapter = adapt(),
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return base_type::async_exec(req, adapter, std::move(token));
|
||||
}
|
||||
|
||||
/** @brief Receives server side pushes asynchronously.
|
||||
*
|
||||
* See aedis::connection::async_receive_push for detailed information.
|
||||
*/
|
||||
template <
|
||||
class Adapter = aedis::detail::response_traits<void>::adapter_type,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<executor_type>>
|
||||
auto async_receive_push(
|
||||
Adapter adapter = adapt(),
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return base_type::async_receive_push(adapter, std::move(token));
|
||||
}
|
||||
|
||||
/** @brief Cancel operations.
|
||||
*
|
||||
* See aedis::connection::cancel for detailed information.
|
||||
*/
|
||||
auto cancel(operation op) -> std::size_t
|
||||
{ return base_type::cancel(op); }
|
||||
|
||||
private:
|
||||
using base_type = connection_base<executor_type, connection<boost::asio::ssl::stream<AsyncReadWriteStream>>>;
|
||||
using this_type = connection<next_layer_type>;
|
||||
|
||||
template <class, class> friend class aedis::connection_base;
|
||||
template <class, class> friend struct aedis::detail::exec_op;
|
||||
template <class> friend struct detail::ssl_connect_with_timeout_op;
|
||||
template <class, class> friend struct detail::ssl_connect_with_timeout_op;
|
||||
template <class> friend struct aedis::detail::run_op;
|
||||
template <class> friend struct aedis::detail::writer_op;
|
||||
template <class> friend struct aedis::detail::ping_op;
|
||||
template <class> friend struct aedis::detail::check_idle_op;
|
||||
template <class> friend struct aedis::detail::reader_op;
|
||||
template <class, class> friend struct aedis::detail::exec_read_op;
|
||||
template <class> friend struct aedis::detail::ping_op;
|
||||
|
||||
auto& lowest_layer() noexcept { return stream_.lowest_layer(); }
|
||||
auto is_open() const noexcept { return stream_.next_layer().is_open(); }
|
||||
void close() { stream_.next_layer().close(); }
|
||||
|
||||
template <class CompletionToken>
|
||||
auto async_connect(timeouts ts, CompletionToken&& token)
|
||||
template <class Timer, class CompletionToken>
|
||||
auto
|
||||
async_connect(
|
||||
boost::asio::ip::tcp::resolver::results_type const& endpoints,
|
||||
timeouts ts,
|
||||
Timer& timer,
|
||||
CompletionToken&& token)
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::ssl_connect_with_timeout_op<this_type>{this, ts}, token, stream_);
|
||||
>(detail::ssl_connect_with_timeout_op<this_type, Timer>{this, &endpoints, ts, &timer}, token, stream_);
|
||||
}
|
||||
|
||||
next_layer_type stream_;
|
||||
|
||||
@@ -72,10 +72,12 @@ auto async_handshake(
|
||||
>(handshake_op<Stream>{&stream, &timer}, token, stream, timer);
|
||||
}
|
||||
|
||||
template <class Conn>
|
||||
template <class Conn, class Timer>
|
||||
struct ssl_connect_with_timeout_op {
|
||||
Conn* conn = nullptr;
|
||||
boost::asio::ip::tcp::resolver::results_type const* endpoints = nullptr;
|
||||
typename Conn::timeouts ts;
|
||||
Timer* timer = nullptr;
|
||||
boost::asio::coroutine coro{};
|
||||
|
||||
template <class Self>
|
||||
@@ -85,21 +87,21 @@ struct ssl_connect_with_timeout_op {
|
||||
{
|
||||
reenter (coro)
|
||||
{
|
||||
conn->ping_timer_.expires_after(ts.connect_timeout);
|
||||
timer->expires_after(ts.connect_timeout);
|
||||
|
||||
yield
|
||||
aedis::detail::async_connect(
|
||||
conn->lowest_layer(), conn->ping_timer_, conn->endpoints_, std::move(self));
|
||||
conn->lowest_layer(), *timer, *endpoints, std::move(self));
|
||||
|
||||
if (ec) {
|
||||
self.complete(ec);
|
||||
return;
|
||||
}
|
||||
|
||||
conn->ping_timer_.expires_after(ts.handshake_timeout);
|
||||
timer->expires_after(ts.handshake_timeout);
|
||||
|
||||
yield
|
||||
async_handshake(conn->next_layer(), conn->ping_timer_, std::move(self));
|
||||
async_handshake(conn->next_layer(), *timer, std::move(self));
|
||||
self.complete(ec);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user