mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Uses composition instead of inheritance in the connection class.
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
cmake_minimum_required(VERSION 3.14)
|
||||
|
||||
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time")
|
||||
#set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time")
|
||||
|
||||
project(
|
||||
boost_redis
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
#ifndef BOOST_REDIS_CONNECTION_HPP
|
||||
#define BOOST_REDIS_CONNECTION_HPP
|
||||
|
||||
#include <boost/redis/connection_base.hpp>
|
||||
#include <boost/redis/detail/connection_base.hpp>
|
||||
#include <boost/redis/logger.hpp>
|
||||
#include <boost/redis/config.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
@@ -34,7 +34,7 @@ struct reconnection_op {
|
||||
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
|
||||
{
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->async_run_one(conn_->cfg_, logger_, std::move(self));
|
||||
conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self));
|
||||
conn_->cancel(operation::receive);
|
||||
logger_.on_connection_lost(ec);
|
||||
if (!conn_->will_reconnect() || is_cancelled(self)) {
|
||||
@@ -68,14 +68,15 @@ struct reconnection_op {
|
||||
*
|
||||
*/
|
||||
template <class Executor>
|
||||
class basic_connection : public connection_base<Executor> {
|
||||
class basic_connection {
|
||||
public:
|
||||
using base_type = connection_base<Executor>;
|
||||
using this_type = basic_connection<Executor>;
|
||||
|
||||
/// Executor type.
|
||||
using executor_type = Executor;
|
||||
|
||||
/// Returns the underlying executor.
|
||||
executor_type get_executor() noexcept
|
||||
{ return impl_.get_executor(); }
|
||||
|
||||
/// Rebinds the socket type to another executor.
|
||||
template <class Executor1>
|
||||
struct rebind_executor
|
||||
@@ -87,7 +88,7 @@ public:
|
||||
/// Contructs from an executor.
|
||||
explicit
|
||||
basic_connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client)
|
||||
: base_type{ex, method}
|
||||
: impl_{ex, method}
|
||||
, timer_{ex}
|
||||
{ }
|
||||
|
||||
@@ -97,12 +98,28 @@ public:
|
||||
: basic_connection(ioc.get_executor(), method)
|
||||
{ }
|
||||
|
||||
/** @brief High-level connection to Redis.
|
||||
/** @brief Starts underlying connection operations.
|
||||
*
|
||||
* This connection class adds reconnection functionality to
|
||||
* `boost::redis::connection_base::async_run_one`. When a
|
||||
* connection is lost for any reason, a new one is stablished
|
||||
* automatically. To disable reconnection call
|
||||
* This member function provides the following functionality
|
||||
*
|
||||
* 1. Resolve the address passed on `boost::redis::config::addr`.
|
||||
* 2. Connect to one of the results obtained in the resolve operation.
|
||||
* 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`.
|
||||
* 4. Start a health-check operation where ping commands are sent
|
||||
* at intervals specified in
|
||||
* `boost::redis::config::health_check_interval`. The message passed to
|
||||
* `PING` will be `boost::redis::config::health_check_id`. Passing a
|
||||
* timeout with value zero will disable health-checks. If the Redis
|
||||
* server does not respond to a health-check within two times the value
|
||||
* specified here, it will be considered unresponsive and the connection
|
||||
* will be closed and a new connection will be stablished.
|
||||
* 5. Starts read and write operations with the Redis
|
||||
* server. More specifically it will trigger the write of all
|
||||
* requests i.e. calls to `async_exec` that happened prior to this
|
||||
* call.
|
||||
*
|
||||
* When a connection is lost for any reason, a new one is
|
||||
* stablished automatically. To disable reconnection call
|
||||
* `boost::redis::connection::cancel(operation::reconnection)`.
|
||||
*
|
||||
* @param cfg Configuration paramters.
|
||||
@@ -115,11 +132,6 @@ public:
|
||||
* void f(system::error_code);
|
||||
* @endcode
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* * This function will complete only if reconnection was disabled
|
||||
* and the connection is lost.
|
||||
*
|
||||
* For example on how to call this function refer to
|
||||
* cpp20_intro.cpp or any other example.
|
||||
*/
|
||||
@@ -132,6 +144,8 @@ public:
|
||||
Logger l = Logger{},
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
using this_type = basic_connection<executor_type>;
|
||||
|
||||
cfg_ = cfg;
|
||||
l.set_prefix(cfg_.log_prefix);
|
||||
return asio::async_compose
|
||||
@@ -140,6 +154,77 @@ public:
|
||||
>(detail::reconnection_op<this_type, Logger>{this, l}, token, timer_);
|
||||
}
|
||||
|
||||
/** @brief Receives server side pushes asynchronously.
|
||||
*
|
||||
* When pushes arrive and there is no `async_receive` operation in
|
||||
* progress, pushed data, requests, and responses will be paused
|
||||
* until `async_receive` is called again. Apps will usually want
|
||||
* to call `async_receive` in a loop.
|
||||
*
|
||||
* To cancel an ongoing receive operation apps should call
|
||||
* `connection::cancel(operation::receive)`.
|
||||
*
|
||||
* @param response Response object.
|
||||
* @param token Completion token.
|
||||
*
|
||||
* For an example see cpp20_subscriber.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the push received in
|
||||
* bytes.
|
||||
*/
|
||||
template <
|
||||
class Response = ignore_t,
|
||||
class CompletionToken = asio::default_completion_token_t<executor_type>
|
||||
>
|
||||
auto
|
||||
async_receive(
|
||||
Response& response,
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return impl_.async_receive(response, token);
|
||||
}
|
||||
|
||||
/** @brief Executes commands on the Redis server asynchronously.
|
||||
*
|
||||
* This function sends a request to the Redis server and waits for
|
||||
* the responses to each individual command in the request. If the
|
||||
* request contains only commands that don't expect a response,
|
||||
* the completion occurs after it has been written to the
|
||||
* underlying stream. Multiple concurrent calls to this function
|
||||
* will be automatically queued by the implementation.
|
||||
*
|
||||
* @param req Request.
|
||||
* @param resp Response.
|
||||
* @param token Completion token.
|
||||
*
|
||||
* For an example see cpp20_echo_server.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the response received
|
||||
* in bytes.
|
||||
*/
|
||||
template <
|
||||
class Response = ignore_t,
|
||||
class CompletionToken = asio::default_completion_token_t<executor_type>
|
||||
>
|
||||
auto
|
||||
async_exec(
|
||||
request const& req,
|
||||
Response& resp = ignore,
|
||||
CompletionToken token = CompletionToken{})
|
||||
{
|
||||
return impl_.async_exec(req, resp, token);
|
||||
}
|
||||
|
||||
/** @brief Cancel operations.
|
||||
*
|
||||
* @li `operation::exec`: Cancels operations started with
|
||||
@@ -152,7 +237,7 @@ public:
|
||||
* @param op: The operation to be cancelled.
|
||||
* @returns The number of operations that have been canceled.
|
||||
*/
|
||||
void cancel(operation op = operation::all) override
|
||||
void cancel(operation op = operation::all)
|
||||
{
|
||||
switch (op) {
|
||||
case operation::reconnection:
|
||||
@@ -163,16 +248,51 @@ public:
|
||||
default: /* ignore */;
|
||||
}
|
||||
|
||||
base_type::cancel(op);
|
||||
impl_.cancel(op);
|
||||
}
|
||||
|
||||
/// Returns true if the connection was canceled.
|
||||
bool will_reconnect() const noexcept
|
||||
{ return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
|
||||
|
||||
private:
|
||||
config cfg_;
|
||||
/** @brief Reserve memory on the read and write internal buffers.
|
||||
*
|
||||
* This function will call `std::string::reserve` on the
|
||||
* underlying buffers.
|
||||
*
|
||||
* @param read The new capacity of the read buffer.
|
||||
* @param write The new capacity of the write buffer.
|
||||
*/
|
||||
void reserve(std::size_t read, std::size_t write)
|
||||
{
|
||||
impl_.reserve(read, write);
|
||||
}
|
||||
|
||||
/// Sets the maximum size of the read buffer.
|
||||
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
|
||||
{ impl_.set_max_buffer_read_size(max_read_size); }
|
||||
|
||||
/// Returns the ssl context.
|
||||
auto const& get_ssl_context() const noexcept
|
||||
{ return impl_.get_ssl_context();}
|
||||
|
||||
/// Returns the ssl context.
|
||||
auto& get_ssl_context() noexcept
|
||||
{ return impl_.get_ssl_context();}
|
||||
|
||||
/// Resets the underlying stream.
|
||||
void reset_stream()
|
||||
{ impl_.reset_stream(); }
|
||||
|
||||
/// Returns a reference to the next layer.
|
||||
auto& next_layer() noexcept
|
||||
{ return impl_.next_layer(); }
|
||||
|
||||
/// Returns a const reference to the next layer.
|
||||
auto const& next_layer() const noexcept
|
||||
{ return impl_.next_layer(); }
|
||||
|
||||
private:
|
||||
using timer_type =
|
||||
asio::basic_waitable_timer<
|
||||
std::chrono::steady_clock,
|
||||
@@ -181,11 +301,19 @@ private:
|
||||
|
||||
template <class, class> friend struct detail::reconnection_op;
|
||||
|
||||
config cfg_;
|
||||
detail::connection_base<executor_type> impl_;
|
||||
timer_type timer_;
|
||||
};
|
||||
|
||||
/** \brief A connection that uses the asio::any_io_executor.
|
||||
/** \brief A basic_connection that type erases the executor.
|
||||
* \ingroup high-level-api
|
||||
*
|
||||
* This connection type uses the asio::any_io_executor and
|
||||
* asio::any_completion_token to reduce compilation times.
|
||||
*
|
||||
* For documentaiton of each member function see
|
||||
* `boost::redis::basic_connection`.
|
||||
*/
|
||||
class connection {
|
||||
public:
|
||||
@@ -198,8 +326,11 @@ public:
|
||||
/// Contructs from a context.
|
||||
explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client);
|
||||
|
||||
executor_type get_executor() noexcept { return impl_.get_executor(); }
|
||||
/// Returns the underlying executor.
|
||||
executor_type get_executor() noexcept
|
||||
{ return impl_.get_executor(); }
|
||||
|
||||
/// Calls `boost::redis::basic_connection::async_run`.
|
||||
template <class CompletionToken>
|
||||
auto async_run(config const& cfg, logger l, CompletionToken token)
|
||||
{
|
||||
@@ -211,31 +342,38 @@ public:
|
||||
}, token, this, &cfg, l);
|
||||
}
|
||||
|
||||
/// Calls `boost::redis::basic_connection::async_receive`.
|
||||
template <class Response, class CompletionToken>
|
||||
auto async_receive(Response& response, CompletionToken token)
|
||||
{
|
||||
return impl_.async_receive(response, std::move(token));
|
||||
}
|
||||
|
||||
/// Calls `boost::redis::basic_connection::async_exec`.
|
||||
template <class Response, class CompletionToken>
|
||||
auto async_exec(request const& req, Response& resp, CompletionToken token)
|
||||
{
|
||||
return impl_.async_exec(req, resp, std::move(token));
|
||||
}
|
||||
|
||||
/// Calls `boost::redis::basic_connection::cancel`.
|
||||
void cancel(operation op = operation::all);
|
||||
|
||||
/// Returns true if the connection was canceled.
|
||||
/// Calls `boost::redis::basic_connection::will_reconnect`.
|
||||
bool will_reconnect() const noexcept
|
||||
{ return impl_.will_reconnect();}
|
||||
|
||||
/// Returns a reference to the next layer.
|
||||
auto& next_layer() noexcept { return impl_.next_layer(); }
|
||||
/// Calls `boost::redis::basic_connection::next_layer`.
|
||||
auto& next_layer() noexcept
|
||||
{ return impl_.next_layer(); }
|
||||
|
||||
/// Returns a const reference to the next layer.
|
||||
auto const& next_layer() const noexcept { return impl_.next_layer(); }
|
||||
/// Calls `boost::redis::basic_connection::next_layer`.
|
||||
auto const& next_layer() const noexcept
|
||||
{ return impl_.next_layer(); }
|
||||
|
||||
void reset_stream() { impl_.reset_stream();}
|
||||
/// Calls `boost::redis::basic_connection::reset_stream`.
|
||||
void reset_stream()
|
||||
{ impl_.reset_stream();}
|
||||
|
||||
private:
|
||||
void
|
||||
|
||||
@@ -37,8 +37,7 @@
|
||||
#include <string_view>
|
||||
#include <type_traits>
|
||||
|
||||
namespace boost::redis {
|
||||
namespace detail {
|
||||
namespace boost::redis::detail {
|
||||
|
||||
template <class Conn>
|
||||
struct wait_receive_op {
|
||||
@@ -473,7 +472,6 @@ struct reader_op {
|
||||
}
|
||||
}
|
||||
};
|
||||
} // detail
|
||||
|
||||
/** @brief Base class for high level Redis asynchronous connections.
|
||||
* @ingroup high-level-api
|
||||
@@ -549,29 +547,6 @@ public:
|
||||
cancel_impl(op);
|
||||
}
|
||||
|
||||
/** @brief Executes commands on the Redis server asynchronously.
|
||||
*
|
||||
* This function sends a request to the Redis server and waits for
|
||||
* the responses to each individual command in the request. If the
|
||||
* request contains only commands that don't expect a response,
|
||||
* the completion occurs after it has been written to the
|
||||
* underlying stream. Multiple concurrent calls to this function
|
||||
* will be automatically queued by the implementation.
|
||||
*
|
||||
* @param req Request.
|
||||
* @param resp Response.
|
||||
* @param token Completion token.
|
||||
*
|
||||
* For an example see cpp20_echo_server.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the response received
|
||||
* in bytes.
|
||||
*/
|
||||
template <
|
||||
class Response = ignore_t,
|
||||
class CompletionToken = asio::default_completion_token_t<executor_type>
|
||||
@@ -592,29 +567,6 @@ public:
|
||||
>(redis::detail::exec_op<this_type, decltype(f)>{this, &req, f}, token, writer_timer_);
|
||||
}
|
||||
|
||||
/** @brief Receives server side pushes asynchronously.
|
||||
*
|
||||
* When pushes arrive and there is no `async_receive` operation in
|
||||
* progress, pushed data, requests, and responses will be paused
|
||||
* until `async_receive` is called again. Apps will usually want
|
||||
* to call `async_receive` in a loop.
|
||||
*
|
||||
* To cancel an ongoing receive operation apps should call
|
||||
* `connection::cancel(operation::receive)`.
|
||||
*
|
||||
* @param response Response object.
|
||||
* @param token Completion token.
|
||||
*
|
||||
* For an example see cpp20_subscriber.cpp. The completion token must
|
||||
* have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(system::error_code, std::size_t);
|
||||
* @endcode
|
||||
*
|
||||
* Where the second parameter is the size of the push received in
|
||||
* bytes.
|
||||
*/
|
||||
template <
|
||||
class Response = ignore_t,
|
||||
class CompletionToken = asio::default_completion_token_t<executor_type>
|
||||
@@ -634,60 +586,17 @@ public:
|
||||
>(redis::detail::receive_op<this_type, decltype(f)>{this, f}, token, read_op_timer_);
|
||||
}
|
||||
|
||||
/** @brief Starts underlying connection operations.
|
||||
*
|
||||
* Provides a high-level connection to the Redis server. It will
|
||||
* perform the following steps
|
||||
*
|
||||
* 1. Resolve the address passed on `boost::redis::config::addr`.
|
||||
* 2. Connect to one of the results obtained in the resolve operation.
|
||||
* 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`.
|
||||
* 4. Start a health-check operation where ping commands are sent
|
||||
* at intervals specified in
|
||||
* `boost::redis::config::health_check_interval`. The message passed to
|
||||
* `PING` will be `boost::redis::config::health_check_id`. Passing a
|
||||
* timeout with value zero will disable health-checks. If the Redis
|
||||
* server does not respond to a health-check within two times the value
|
||||
* specified here, it will be considered unresponsive and the connection
|
||||
* will be closed and a new connection will be stablished.
|
||||
* 5. Starts read and write operations with the Redis
|
||||
* server. More specifically it will trigger the write of all
|
||||
* requests i.e. calls to `async_exec` that happened prior to this
|
||||
* call.
|
||||
*
|
||||
* @param cfg Configuration paramters.
|
||||
* @param l Logger object. The interface expected is specified in the class `boost::redis::logger`.
|
||||
* @param token Completion token.
|
||||
*
|
||||
* The completion token must have the following signature
|
||||
*
|
||||
* @code
|
||||
* void f(system::error_code);
|
||||
* @endcode
|
||||
*
|
||||
* For example on how to call this function refer to
|
||||
* cpp20_intro.cpp or any other example.
|
||||
*/
|
||||
template <class Logger, class CompletionToken>
|
||||
auto async_run_one(config const& cfg, Logger l, CompletionToken token)
|
||||
auto async_run(config const& cfg, Logger l, CompletionToken token)
|
||||
{
|
||||
runner_.set_config(cfg);
|
||||
l.set_prefix(runner_.get_config().log_prefix);
|
||||
return runner_.async_run(*this, l, std::move(token));
|
||||
}
|
||||
|
||||
/// Sets the maximum size of the read buffer.
|
||||
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
|
||||
{max_read_size_ = max_read_size;}
|
||||
|
||||
/** @brief Reserve memory on the read and write internal buffers.
|
||||
*
|
||||
* This function will call `std::string::reserve` on the
|
||||
* underlying buffers.
|
||||
*
|
||||
* @param read The new capacity of the read buffer.
|
||||
* @param write The new capacity of the write buffer.
|
||||
*/
|
||||
void reserve(std::size_t read, std::size_t write)
|
||||
{
|
||||
read_buffer_.reserve(read);
|
||||
@@ -1024,6 +933,6 @@ private:
|
||||
std::size_t max_read_size_ = (std::numeric_limits<std::size_t>::max)();
|
||||
};
|
||||
|
||||
} // boost::redis
|
||||
} // boost::redis::detail
|
||||
|
||||
#endif // BOOST_REDIS_CONNECTION_BASE_HPP
|
||||
Reference in New Issue
Block a user