2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Merge pull request #116 from boostorg/113-create-an-experimental-connection-class-that-has-fast-compilation-times

113 create an experimental connection class that has fast compilation times
This commit is contained in:
Marcelo
2023-06-24 09:52:48 +02:00
committed by GitHub
17 changed files with 331 additions and 217 deletions

View File

@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.14)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time")
#set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_COMMAND} -E time")
project(
boost_redis

117
README.md
View File

@@ -1,41 +1,30 @@
# boost_redis
Boost.Redis is a [Redis](https://redis.io/) client library built on top of
Boost.Redis is a high-level [Redis](https://redis.io/) client library built on top of
[Boost.Asio](https://www.boost.org/doc/libs/release/doc/html/boost_asio.html)
that implements Redis plain text protocol
[RESP3](https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md).
It can multiplex any number of client
requests, responses, and server pushes onto a single active socket
connection to the Redis server. The library hides low-level code away
from the user, which, in the majority of the cases will be concerned
with only three library entities
* `boost::redis::connection`: A full-duplex connection to the Redis
server with high-level functions to execute Redis commands, receive
server pushes and automatic command [pipelines](https://redis.io/docs/manual/pipelining/).
* `boost::redis::request`: A container of Redis commands that supports
STL containers and user defined data types.
* `boost::redis::response`: Container of Redis responses.
In the next sections we will cover all these points in detail with
examples. The requirements for using Boost.Redis are
connection to the Redis server. The requirements for using Boost.Redis are
* Boost 1.81 or greater.
* C++17 minimum.
* Redis 6 or higher (must support RESP3).
* Gcc (10, 11, 12), Clang (11, 13, 14) and Visual Studio (16 2019, 17 2022).
* Have basic-level knowledge about Redis and understand Asio and its asynchronous model.
* Have basic-level knowledge about [Redis](https://redis.io/docs/)
and [Boost.Asio](https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview.html).
To install Boost.Redis download the latest release on
https://github.com/boostorg/redis/releases. Boost.Redis is a header only
library, so you can starting using it right away by adding the
`include` subdirectory to your project and including
The latest release can be downloaded on
https://github.com/boostorg/redis/releases. The library headers can be
found in the `include` subdirectory and a compilation of the source
```cpp
#include <boost/redis/src.hpp>
```
in no more than one source file in your applications. To build the
is required. The simplest way to do it is to included this header in
no more than one source file in your applications. To build the
examples and tests cmake is supported, for example
```cpp
@@ -45,21 +34,10 @@ $ BOOST_ROOT=/opt/boost_1_81_0 cmake --preset g++-11
# Windows
$ cmake -G "Visual Studio 17 2022" -A x64 -B bin64 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake
```
<a name="connection"></a>
## Connection
Readers that are not familiar with Redis are advised to learn more about
it on https://redis.io/docs/ before we start, in essence
> Redis is an open source (BSD licensed), in-memory data structure
> store used as a database, cache, message broker, and streaming
> engine. Redis provides data structures such as strings, hashes,
> lists, sets, sorted sets with range queries, bitmaps, hyperloglogs,
> geospatial indexes, and streams. Redis has built-in replication, Lua
> scripting, LRU eviction, transactions, and different levels of
> on-disk persistence, and provides high availability via Redis
> Sentinel and automatic partitioning with Redis Cluster.
Let us start with a simple application that uses a short-lived
connection to send a [ping](https://redis.io/commands/ping/) command
to Redis
@@ -78,7 +56,7 @@ auto co_main(config const& cfg) -> net::awaitable<void>
response<std::string> resp;
// Executes the request.
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
std::cout << "PING: " << std::get<0>(resp).value() << std::endl;
@@ -87,12 +65,12 @@ auto co_main(config const& cfg) -> net::awaitable<void>
The roles played by the `async_run` and `async_exec` functions are
* `connection::async_exec`: Execute the commands contained in the
* `async_exec`: Execute the commands contained in the
request and store the individual responses in the `resp` object. Can
be called from multiple places in your code concurrently.
* `connection::async_run`: Resolve, connect, ssl-handshake,
resp3-handshake, health-checks reconnection and coordinate low-level
read and write operations..
* `async_run`: Resolve, connect, ssl-handshake,
resp3-handshake, health-checks, reconnection and coordinate low-level
read and write operations (among other things).
### Server pushes
@@ -114,22 +92,22 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");
while (!conn->is_cancelled()) {
// Loop while reconnection is enabled
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
// Loop reading Redis pushs messages.
// Loop reading Redis pushes.
for (generic_response resp;;) {
error_code ec;
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
std::cout
<< resp.value().at(1).value
<< " " << resp.value().at(2).value
<< " " << resp.value().at(3).value
<< std::endl;
// Use the response resp in some way and then clear it.
...
resp.value().clear();
}
}
@@ -167,22 +145,21 @@ req.push_range("HSET", "key", map);
Sending a request to Redis is performed with `boost::redis::connection::async_exec` as already stated.
<a name="responses"></a>
### Config flags
The `boost::redis::request::config` object inside the request dictates how the
`boost::redis::connection` should handle the request in some important situations. The
reader is advised to read it carefully.
<a name="responses"></a>
## Responses
Boost.Redis uses the following strategy to support Redis responses
* **Static**: For `boost::redis::request` whose sizes are known at compile time use the `response` type.
* `boost::redis::request` is used for requests whose number of commands are not dynamic.
* **Dynamic**: Otherwise use `boost::redis::generic_response`.
For example, below is a request with a compile time size
For example, the request below has three commands
```cpp
request req;
@@ -191,18 +168,19 @@ req.push("INCR", "key");
req.push("QUIT");
```
To read the response to this request users can use the following tuple
and its response also has three comamnds and can be read in the
following response object
```cpp
response<std::string, int, std::string>
```
The pattern might have become apparent to the reader: the tuple must
The response behaves as a tuple and must
have as many elements as the request has commands (exceptions below).
It is also necessary that each tuple element is capable of storing the
response to the command it refers to, otherwise an error will occur.
To ignore responses to individual commands in the request use the tag
`boost::redis::ignore_t`
`boost::redis::ignore_t`, for example
```cpp
// Ignore the second and last responses.
@@ -266,18 +244,14 @@ response<
Where both are passed to `async_exec` as showed elsewhere
```cpp
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
```
If the intention is to ignore the response to all commands altogether
use `ignore`
If the intention is to ignore responses altogether use `ignore`
```cpp
// Ignores the response
co_await conn->async_exec(req, ignore);
// The default response argument will also ignore responses.
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
```
Responses that contain nested aggregates or heterogeneous data
@@ -294,7 +268,7 @@ Commands that have no response like
* `"PSUBSCRIBE"`
* `"UNSUBSCRIBE"`
must be **NOT** be included in the response tuple. For example, the request below
must **NOT** be included in the response tuple. For example, the request below
```cpp
request req;
@@ -304,7 +278,7 @@ req.push("QUIT");
```
must be read in this tuple `response<std::string, std::string>`,
that has size two.
that has static size two.
### Null
@@ -320,17 +294,17 @@ response<
...
> resp;
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
```
Everything else stays pretty much the same.
### Transactions
To read responses to transactions we must first observe that Redis will
queue the transaction commands and send their individual responses as elements
of an array, the array is itself the response to the `EXEC` command.
For example, to read the response to this request
To read responses to transactions we must first observe that Redis
will queue the transaction commands and send their individual
responses as elements of an array, the array is itself the response to
the `EXEC` command. For example, to read the response to this request
```cpp
req.push("MULTI");
@@ -360,7 +334,7 @@ response<
exec_resp_type, // exec
> resp;
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
```
For a complete example see cpp20_containers.cpp.
@@ -408,7 +382,7 @@ using other types
```cpp
// Receives any RESP3 simple or aggregate data type.
boost::redis::generic_response resp;
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
```
For example, suppose we want to retrieve a hash data structure
@@ -460,9 +434,8 @@ The examples below show how to use the features discussed so far
* cpp17_intro.cpp: Uses callbacks and requires C++17.
* cpp17_intro_sync.cpp: Runs `async_run` in a separate thread and performs synchronous calls to `async_exec`.
To avoid repetition code that is common to some examples has been
grouped in common.hpp. The main function used in some async examples
has been factored out in the main.cpp file.
The main function used in some async examples has been factored out in
the main.cpp file.
## Echo server benchmark
@@ -701,7 +674,7 @@ https://lists.boost.org/Archives/boost/2023/01/253944.php.
## Changelog
### master (incorporates changes to conform the boost review and more)
### develop (incorporates changes to conform the boost review and more)
* Adds Redis stream example.

View File

@@ -19,11 +19,12 @@
namespace net = boost::asio;
using stream_descriptor = net::deferred_t::as_default_on_t<net::posix::stream_descriptor>;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::ignore;
using net::redirect_error;
using net::use_awaitable;
using boost::system::error_code;
@@ -41,7 +42,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
// Loop reading Redis push messages.
for (generic_response resp;;) {
@@ -66,7 +67,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto n = co_await net::async_read_until(*in, net::dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
msg.erase(0, n);
}
}

View File

@@ -20,7 +20,7 @@ using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::ignore;
using boost::redis::config;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
void print(std::map<std::string, std::string> const& cont)
{
@@ -47,7 +47,7 @@ auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
}
auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -60,7 +60,7 @@ auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::map<std::string, std::string>> resp;
// Executes the request and reads the response.
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
print(std::get<0>(resp).value());
}
@@ -81,7 +81,7 @@ auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>> // exec
> resp;
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
print(std::get<0>(std::get<3>(resp).value()).value().value());
print(std::get<1>(std::get<3>(resp).value()).value().value());

View File

@@ -18,11 +18,11 @@ namespace net = boost::asio;
using tcp_socket = net::deferred_t::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::deferred_t::as_default_on_t<net::ip::tcp::acceptor>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::system::error_code;
using boost::redis::connection;
using namespace std::chrono_literals;
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -33,7 +33,7 @@ auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) ->
for (std::string buffer;;) {
auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n");
req.push("PING", buffer);
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
co_await net::async_write(socket, net::buffer(std::get<0>(resp).value()));
std::get<0>(resp).value().clear();
req.clear();

View File

@@ -17,8 +17,7 @@ namespace net = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
@@ -34,7 +33,7 @@ auto co_main(config cfg) -> net::awaitable<void>
response<std::string> resp;
// Executes the request.
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
std::cout << "PING: " << std::get<0>(resp).value() << std::endl;

View File

@@ -18,7 +18,7 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
auto verify_certificate(bool, net::ssl::verify_context&) -> bool
{
@@ -45,7 +45,7 @@ auto co_main(config cfg) -> net::awaitable<void>
conn->next_layer().set_verify_mode(net::ssl::verify_peer);
conn->next_layer().set_verify_callback(verify_certificate);
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
std::cout << "Response: " << std::get<0>(resp).value() << std::endl;

View File

@@ -29,7 +29,7 @@ using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::config;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
// Struct that will be stored in Redis using json serialization.
struct user {
@@ -64,7 +64,7 @@ auto co_main(config cfg) -> net::awaitable<void>
response<ignore_t, user> resp;
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
// Prints the first ping

View File

@@ -25,7 +25,7 @@ using boost::redis::response;
using boost::redis::operation;
using boost::redis::ignore_t;
using boost::redis::config;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
// The protobuf type described in examples/person.proto
using tutorial::person;
@@ -76,7 +76,7 @@ net::awaitable<void> co_main(config cfg)
response<ignore_t, person> resp;
// Sends the request and receives the response.
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
conn->cancel();
std::cout

View File

@@ -19,7 +19,7 @@ using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::config;
using boost::redis::address;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }

View File

@@ -25,7 +25,7 @@ using boost::redis::config;
using boost::redis::generic_response;
using boost::redis::operation;
using boost::redis::request;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
@@ -39,7 +39,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
for (;;) {
req.push("XREAD", "BLOCK", "0", "STREAMS", "test-topic", stream_id);
co_await conn->async_exec(req, resp);
co_await conn->async_exec(req, resp, net::deferred);
// std::cout << "Response: ";
// for (int i = 0; i < resp->value().size(); ++i) {

View File

@@ -24,8 +24,9 @@ using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::ignore;
using boost::system::error_code;
using connection = net::deferred_t::as_default_on_t<boost::redis::connection>;
using boost::redis::connection;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
/* This example will subscribe and read pushes indefinitely.
@@ -51,10 +52,11 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");
// Loop while reconnection is enabled
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req);
co_await conn->async_exec(req, ignore, net::deferred);
// Loop reading Redis pushs messages.
for (generic_response resp;;) {

View File

@@ -7,17 +7,17 @@
#ifndef BOOST_REDIS_CONNECTION_HPP
#define BOOST_REDIS_CONNECTION_HPP
#include <boost/redis/connection_base.hpp>
#include <boost/redis/detail/connection_base.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/config.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/any_completion_handler.hpp>
#include <chrono>
#include <memory>
#include <iostream>
namespace boost::redis {
namespace detail
@@ -34,7 +34,7 @@ struct reconnection_op {
BOOST_ASIO_CORO_REENTER (coro_) for (;;)
{
BOOST_ASIO_CORO_YIELD
conn_->async_run_one(conn_->cfg_, logger_, std::move(self));
conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self));
conn_->cancel(operation::receive);
logger_.on_connection_lost(ec);
if (!conn_->will_reconnect() || is_cancelled(self)) {
@@ -68,14 +68,15 @@ struct reconnection_op {
*
*/
template <class Executor>
class basic_connection : public connection_base<Executor> {
class basic_connection {
public:
using base_type = connection_base<Executor>;
using this_type = basic_connection<Executor>;
/// Executor type.
using executor_type = Executor;
/// Returns the underlying executor.
executor_type get_executor() noexcept
{ return impl_.get_executor(); }
/// Rebinds the socket type to another executor.
template <class Executor1>
struct rebind_executor
@@ -87,7 +88,7 @@ public:
/// Contructs from an executor.
explicit
basic_connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client)
: base_type{ex, method}
: impl_{ex, method}
, timer_{ex}
{ }
@@ -97,12 +98,28 @@ public:
: basic_connection(ioc.get_executor(), method)
{ }
/** @brief High-level connection to Redis.
/** @brief Starts underlying connection operations.
*
* This connection class adds reconnection functionality to
* `boost::redis::connection_base::async_run_one`. When a
* connection is lost for any reason, a new one is stablished
* automatically. To disable reconnection call
* This member function provides the following functionality
*
* 1. Resolve the address passed on `boost::redis::config::addr`.
* 2. Connect to one of the results obtained in the resolve operation.
* 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`.
* 4. Start a health-check operation where ping commands are sent
* at intervals specified in
* `boost::redis::config::health_check_interval`. The message passed to
* `PING` will be `boost::redis::config::health_check_id`. Passing a
* timeout with value zero will disable health-checks. If the Redis
* server does not respond to a health-check within two times the value
* specified here, it will be considered unresponsive and the connection
* will be closed and a new connection will be stablished.
* 5. Starts read and write operations with the Redis
* server. More specifically it will trigger the write of all
* requests i.e. calls to `async_exec` that happened prior to this
* call.
*
* When a connection is lost for any reason, a new one is
* stablished automatically. To disable reconnection call
* `boost::redis::connection::cancel(operation::reconnection)`.
*
* @param cfg Configuration paramters.
@@ -115,11 +132,6 @@ public:
* void f(system::error_code);
* @endcode
*
* @remarks
*
* * This function will complete only if reconnection was disabled
* and the connection is lost.
*
* For example on how to call this function refer to
* cpp20_intro.cpp or any other example.
*/
@@ -132,6 +144,8 @@ public:
Logger l = Logger{},
CompletionToken token = CompletionToken{})
{
using this_type = basic_connection<executor_type>;
cfg_ = cfg;
l.set_prefix(cfg_.log_prefix);
return asio::async_compose
@@ -140,6 +154,77 @@ public:
>(detail::reconnection_op<this_type, Logger>{this, l}, token, timer_);
}
/** @brief Receives server side pushes asynchronously.
*
* When pushes arrive and there is no `async_receive` operation in
* progress, pushed data, requests, and responses will be paused
* until `async_receive` is called again. Apps will usually want
* to call `async_receive` in a loop.
*
* To cancel an ongoing receive operation apps should call
* `connection::cancel(operation::receive)`.
*
* @param response Response object.
* @param token Completion token.
*
* For an example see cpp20_subscriber.cpp. The completion token must
* have the following signature
*
* @code
* void f(system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the push received in
* bytes.
*/
template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>
>
auto
async_receive(
Response& response,
CompletionToken token = CompletionToken{})
{
return impl_.async_receive(response, token);
}
/** @brief Executes commands on the Redis server asynchronously.
*
* This function sends a request to the Redis server and waits for
* the responses to each individual command in the request. If the
* request contains only commands that don't expect a response,
* the completion occurs after it has been written to the
* underlying stream. Multiple concurrent calls to this function
* will be automatically queued by the implementation.
*
* @param req Request.
* @param resp Response.
* @param token Completion token.
*
* For an example see cpp20_echo_server.cpp. The completion token must
* have the following signature
*
* @code
* void f(system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the response received
* in bytes.
*/
template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>
>
auto
async_exec(
request const& req,
Response& resp = ignore,
CompletionToken token = CompletionToken{})
{
return impl_.async_exec(req, resp, token);
}
/** @brief Cancel operations.
*
* @li `operation::exec`: Cancels operations started with
@@ -152,7 +237,7 @@ public:
* @param op: The operation to be cancelled.
* @returns The number of operations that have been canceled.
*/
void cancel(operation op = operation::all) override
void cancel(operation op = operation::all)
{
switch (op) {
case operation::reconnection:
@@ -163,16 +248,51 @@ public:
default: /* ignore */;
}
base_type::cancel(op);
impl_.cancel(op);
}
/// Returns true if the connection was canceled.
bool will_reconnect() const noexcept
{ return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
private:
config cfg_;
/** @brief Reserve memory on the read and write internal buffers.
*
* This function will call `std::string::reserve` on the
* underlying buffers.
*
* @param read The new capacity of the read buffer.
* @param write The new capacity of the write buffer.
*/
void reserve(std::size_t read, std::size_t write)
{
impl_.reserve(read, write);
}
/// Sets the maximum size of the read buffer.
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
{ impl_.set_max_buffer_read_size(max_read_size); }
/// Returns the ssl context.
auto const& get_ssl_context() const noexcept
{ return impl_.get_ssl_context();}
/// Returns the ssl context.
auto& get_ssl_context() noexcept
{ return impl_.get_ssl_context();}
/// Resets the underlying stream.
void reset_stream()
{ impl_.reset_stream(); }
/// Returns a reference to the next layer.
auto& next_layer() noexcept
{ return impl_.next_layer(); }
/// Returns a const reference to the next layer.
auto const& next_layer() const noexcept
{ return impl_.next_layer(); }
private:
using timer_type =
asio::basic_waitable_timer<
std::chrono::steady_clock,
@@ -181,13 +301,89 @@ private:
template <class, class> friend struct detail::reconnection_op;
config cfg_;
detail::connection_base<executor_type> impl_;
timer_type timer_;
};
/** \brief A connection that uses the asio::any_io_executor.
/** \brief A basic_connection that type erases the executor.
* \ingroup high-level-api
*
* This connection type uses the asio::any_io_executor and
* asio::any_completion_token to reduce compilation times.
*
* For documentaiton of each member function see
* `boost::redis::basic_connection`.
*/
using connection = basic_connection<asio::any_io_executor>;
class connection {
public:
/// Executor type.
using executor_type = asio::any_io_executor;
/// Contructs from an executor.
explicit connection(executor_type ex, asio::ssl::context::method method = asio::ssl::context::tls_client);
/// Contructs from a context.
explicit connection(asio::io_context& ioc, asio::ssl::context::method method = asio::ssl::context::tls_client);
/// Returns the underlying executor.
executor_type get_executor() noexcept
{ return impl_.get_executor(); }
/// Calls `boost::redis::basic_connection::async_run`.
template <class CompletionToken>
auto async_run(config const& cfg, logger l, CompletionToken token)
{
return asio::async_initiate<
CompletionToken, void(boost::system::error_code)>(
[](auto handler, connection* self, config const* cfg, logger l)
{
self->async_run_impl(*cfg, l, std::move(handler));
}, token, this, &cfg, l);
}
/// Calls `boost::redis::basic_connection::async_receive`.
template <class Response, class CompletionToken>
auto async_receive(Response& response, CompletionToken token)
{
return impl_.async_receive(response, std::move(token));
}
/// Calls `boost::redis::basic_connection::async_exec`.
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
{
return impl_.async_exec(req, resp, std::move(token));
}
/// Calls `boost::redis::basic_connection::cancel`.
void cancel(operation op = operation::all);
/// Calls `boost::redis::basic_connection::will_reconnect`.
bool will_reconnect() const noexcept
{ return impl_.will_reconnect();}
/// Calls `boost::redis::basic_connection::next_layer`.
auto& next_layer() noexcept
{ return impl_.next_layer(); }
/// Calls `boost::redis::basic_connection::next_layer`.
auto const& next_layer() const noexcept
{ return impl_.next_layer(); }
/// Calls `boost::redis::basic_connection::reset_stream`.
void reset_stream()
{ impl_.reset_stream();}
private:
void
async_run_impl(
config const& cfg,
logger l,
asio::any_completion_handler<void(boost::system::error_code)> token);
basic_connection<executor_type> impl_;
};
} // boost::redis

View File

@@ -37,8 +37,7 @@
#include <string_view>
#include <type_traits>
namespace boost::redis {
namespace detail {
namespace boost::redis::detail {
template <class Conn>
struct wait_receive_op {
@@ -473,7 +472,6 @@ struct reader_op {
}
}
};
} // detail
/** @brief Base class for high level Redis asynchronous connections.
* @ingroup high-level-api
@@ -549,29 +547,6 @@ public:
cancel_impl(op);
}
/** @brief Executes commands on the Redis server asynchronously.
*
* This function sends a request to the Redis server and waits for
* the responses to each individual command in the request. If the
* request contains only commands that don't expect a response,
* the completion occurs after it has been written to the
* underlying stream. Multiple concurrent calls to this function
* will be automatically queued by the implementation.
*
* @param req Request.
* @param resp Response.
* @param token Completion token.
*
* For an example see cpp20_echo_server.cpp. The completion token must
* have the following signature
*
* @code
* void f(system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the response received
* in bytes.
*/
template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>
@@ -592,29 +567,6 @@ public:
>(redis::detail::exec_op<this_type, decltype(f)>{this, &req, f}, token, writer_timer_);
}
/** @brief Receives server side pushes asynchronously.
*
* When pushes arrive and there is no `async_receive` operation in
* progress, pushed data, requests, and responses will be paused
* until `async_receive` is called again. Apps will usually want
* to call `async_receive` in a loop.
*
* To cancel an ongoing receive operation apps should call
* `connection::cancel(operation::receive)`.
*
* @param response Response object.
* @param token Completion token.
*
* For an example see cpp20_subscriber.cpp. The completion token must
* have the following signature
*
* @code
* void f(system::error_code, std::size_t);
* @endcode
*
* Where the second parameter is the size of the push received in
* bytes.
*/
template <
class Response = ignore_t,
class CompletionToken = asio::default_completion_token_t<executor_type>
@@ -634,60 +586,17 @@ public:
>(redis::detail::receive_op<this_type, decltype(f)>{this, f}, token, read_op_timer_);
}
/** @brief Starts underlying connection operations.
*
* Provides a high-level connection to the Redis server. It will
* perform the following steps
*
* 1. Resolve the address passed on `boost::redis::config::addr`.
* 2. Connect to one of the results obtained in the resolve operation.
* 3. Send a [HELLO](https://redis.io/commands/hello/) command where each of its parameters are read from `cfg`.
* 4. Start a health-check operation where ping commands are sent
* at intervals specified in
* `boost::redis::config::health_check_interval`. The message passed to
* `PING` will be `boost::redis::config::health_check_id`. Passing a
* timeout with value zero will disable health-checks. If the Redis
* server does not respond to a health-check within two times the value
* specified here, it will be considered unresponsive and the connection
* will be closed and a new connection will be stablished.
* 5. Starts read and write operations with the Redis
* server. More specifically it will trigger the write of all
* requests i.e. calls to `async_exec` that happened prior to this
* call.
*
* @param cfg Configuration paramters.
* @param l Logger object. The interface expected is specified in the class `boost::redis::logger`.
* @param token Completion token.
*
* The completion token must have the following signature
*
* @code
* void f(system::error_code);
* @endcode
*
* For example on how to call this function refer to
* cpp20_intro.cpp or any other example.
*/
template <class Logger, class CompletionToken>
auto async_run_one(config const& cfg, Logger l, CompletionToken token)
auto async_run(config const& cfg, Logger l, CompletionToken token)
{
runner_.set_config(cfg);
l.set_prefix(runner_.get_config().log_prefix);
return runner_.async_run(*this, l, std::move(token));
}
/// Sets the maximum size of the read buffer.
void set_max_buffer_read_size(std::size_t max_read_size) noexcept
{max_read_size_ = max_read_size;}
/** @brief Reserve memory on the read and write internal buffers.
*
* This function will call `std::string::reserve` on the
* underlying buffers.
*
* @param read The new capacity of the read buffer.
* @param write The new capacity of the write buffer.
*/
void reserve(std::size_t read, std::size_t write)
{
read_buffer_.reserve(read);
@@ -1024,6 +933,6 @@ private:
std::size_t max_read_size_ = (std::numeric_limits<std::size_t>::max)();
};
} // boost::redis
} // boost::redis::detail
#endif // BOOST_REDIS_CONNECTION_BASE_HPP

View File

@@ -0,0 +1,33 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
namespace boost::redis {
connection::connection(executor_type ex, asio::ssl::context::method method)
: impl_{ex, method}
{ }
connection::connection(asio::io_context& ioc, asio::ssl::context::method method)
: impl_(ioc.get_executor(), method)
{ }
void
connection::async_run_impl(
config const& cfg,
logger l,
asio::any_completion_handler<void(boost::system::error_code)> token)
{
impl_.async_run(cfg, l, std::move(token));
}
void connection::cancel(operation op)
{
impl_.cancel(op);
}
} // namespace boost::redis

View File

@@ -8,6 +8,7 @@
#include <boost/redis/impl/logger.ipp>
#include <boost/redis/impl/request.ipp>
#include <boost/redis/impl/ignore.ipp>
#include <boost/redis/impl/connection.ipp>
#include <boost/redis/resp3/impl/type.ipp>
#include <boost/redis/resp3/impl/parser.ipp>
#include <boost/redis/resp3/impl/serialization.ipp>

View File

@@ -30,10 +30,10 @@ using boost::redis::ignore;
using boost::redis::logger;
using boost::redis::config;
using boost::redis::operation;
using boost::redis::connection;
using boost::system::error_code;
using boost::asio::use_awaitable;
using boost::asio::redirect_error;
using connection = boost::asio::use_awaitable_t<>::as_default_on_t<boost::redis::connection>;
using namespace std::chrono_literals;
// Push consumer