mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
committed by
GitHub
parent
89a42dbf74
commit
7304d99bf6
7
.github/workflows/ci.yml
vendored
7
.github/workflows/ci.yml
vendored
@@ -196,6 +196,13 @@ jobs:
|
||||
build-type: 'Debug'
|
||||
cxxflags: '-fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all'
|
||||
ldflags: '-fsanitize=address -fsanitize=undefined'
|
||||
|
||||
- toolset: gcc-14
|
||||
install: 'g++-14'
|
||||
container: ubuntu:24.04
|
||||
cxxstd: '23'
|
||||
build-type: 'Debug'
|
||||
cxxflags: '-DBOOST_ASIO_DISABLE_LOCAL_SOCKETS=1' # If a system had no UNIX socket support, we build correctly
|
||||
|
||||
- toolset: gcc-14
|
||||
install: 'g++-14'
|
||||
|
||||
@@ -414,6 +414,7 @@ The examples below show how to use the features discussed so far
|
||||
|
||||
* cpp20_intro.cpp: Does not use awaitable operators.
|
||||
* cpp20_intro_tls.cpp: Communicates over TLS.
|
||||
* cpp20_unix_sockets.cpp: Communicates over UNIX domain sockets.
|
||||
* cpp20_containers.cpp: Shows how to send and receive STL containers and how to use transactions.
|
||||
* cpp20_json.cpp: Shows how to serialize types using Boost.Json.
|
||||
* cpp20_protobuf.cpp: Shows how to serialize types using protobuf.
|
||||
|
||||
@@ -29,6 +29,7 @@ make_testable_example(cpp20_intro 20)
|
||||
make_testable_example(cpp20_containers 20)
|
||||
make_testable_example(cpp20_json 20)
|
||||
make_testable_example(cpp20_intro_tls 20)
|
||||
make_testable_example(cpp20_unix_sockets 20)
|
||||
|
||||
make_example(cpp20_subscriber 20)
|
||||
make_example(cpp20_streams 20)
|
||||
|
||||
60
example/cpp20_unix_sockets.cpp
Normal file
60
example/cpp20_unix_sockets.cpp
Normal file
@@ -0,0 +1,60 @@
|
||||
//
|
||||
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
|
||||
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
#include <boost/asio/awaitable.hpp>
|
||||
|
||||
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
|
||||
|
||||
#include <boost/redis/connection.hpp>
|
||||
|
||||
#include <boost/asio/consign.hpp>
|
||||
#include <boost/asio/detached.hpp>
|
||||
#include <boost/asio/this_coro.hpp>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace asio = boost::asio;
|
||||
using boost::redis::request;
|
||||
using boost::redis::response;
|
||||
using boost::redis::config;
|
||||
using boost::redis::logger;
|
||||
using boost::redis::connection;
|
||||
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
|
||||
auto co_main(config cfg) -> asio::awaitable<void>
|
||||
{
|
||||
// If unix_socket is set to a non-empty string, UNIX domain sockets will be used
|
||||
// instead of TCP. Set this value to the path where your server is listening.
|
||||
// UNIX domain socket connections work in the same way as TCP connections.
|
||||
cfg.unix_socket = "/tmp/redis-socks/redis.sock";
|
||||
|
||||
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
|
||||
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
|
||||
|
||||
request req;
|
||||
req.push("PING");
|
||||
|
||||
response<std::string> resp;
|
||||
|
||||
co_await conn->async_exec(req, resp);
|
||||
conn->cancel();
|
||||
|
||||
std::cout << "Response: " << std::get<0>(resp).value() << std::endl;
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
auto co_main(config) -> asio::awaitable<void>
|
||||
{
|
||||
std::cout << "Sorry, your system does not support UNIX domain sockets\n";
|
||||
co_return;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#endif
|
||||
@@ -34,6 +34,11 @@ struct config {
|
||||
/// Address of the Redis server.
|
||||
address addr = address{"127.0.0.1", "6379"};
|
||||
|
||||
/// The UNIX domain socket path where the server is listening. If non-empty,
|
||||
/// communication with the server will happen using UNIX domain sockets, and addr will be ignored.
|
||||
/// UNIX domain sockets can't be used with SSL: if `unix_socket` is non-empty, `use_ssl` must be false.
|
||||
std::string unix_socket;
|
||||
|
||||
/** @brief Username passed to the
|
||||
* [HELLO](https://redis.io/commands/hello/) command. If left
|
||||
* empty `HELLO` will be sent without authentication parameters.
|
||||
|
||||
@@ -290,12 +290,25 @@ struct reader_op {
|
||||
}
|
||||
};
|
||||
|
||||
inline system::error_code check_config(const config& cfg)
|
||||
{
|
||||
if (!cfg.unix_socket.empty()) {
|
||||
#ifndef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
return error::unix_sockets_unsupported;
|
||||
#endif
|
||||
if (cfg.use_ssl)
|
||||
return error::unix_sockets_ssl_unsupported;
|
||||
}
|
||||
return system::error_code{};
|
||||
}
|
||||
|
||||
template <class Conn, class Logger>
|
||||
class run_op {
|
||||
private:
|
||||
Conn* conn_ = nullptr;
|
||||
Logger logger_;
|
||||
asio::coroutine coro_{};
|
||||
system::error_code stored_ec_;
|
||||
|
||||
using order_t = std::array<std::size_t, 5>;
|
||||
|
||||
@@ -321,75 +334,87 @@ public:
|
||||
system::error_code ec3 = {},
|
||||
system::error_code = {})
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER(coro_) for (;;)
|
||||
BOOST_ASIO_CORO_REENTER(coro_)
|
||||
{
|
||||
// Try to connect
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->stream_.async_connect(&conn_->cfg_, logger_, std::move(self));
|
||||
|
||||
// If we failed, try again
|
||||
// Check config
|
||||
ec0 = check_config(conn_->cfg_);
|
||||
if (ec0) {
|
||||
self.complete(ec0);
|
||||
logger_.log_error("Invalid configuration", ec0);
|
||||
stored_ec_ = ec0;
|
||||
BOOST_ASIO_CORO_YIELD asio::async_immediate(self.get_io_executor(), std::move(self));
|
||||
self.complete(stored_ec_);
|
||||
return;
|
||||
}
|
||||
|
||||
conn_->mpx_.reset();
|
||||
for (;;) {
|
||||
// Try to connect
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->stream_.async_connect(&conn_->cfg_, logger_, std::move(self));
|
||||
|
||||
// Note: Order is important here because the writer might
|
||||
// trigger an async_write before the async_hello thereby
|
||||
// causing an authentication problem.
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
asio::experimental::make_parallel_group(
|
||||
[this](auto token) {
|
||||
return conn_->handshaker_.async_hello(*conn_, logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->health_checker_.async_ping(*conn_, logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->reader(logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->writer(logger_, token);
|
||||
})
|
||||
.async_wait(asio::experimental::wait_for_one_error(), std::move(self));
|
||||
// If we failed, try again
|
||||
if (ec0) {
|
||||
self.complete(ec0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (order[0] == 0 && !!ec0) {
|
||||
self.complete(ec0);
|
||||
return;
|
||||
}
|
||||
conn_->mpx_.reset();
|
||||
|
||||
if (order[0] == 2 && ec2 == error::pong_timeout) {
|
||||
self.complete(ec1);
|
||||
return;
|
||||
}
|
||||
// Note: Order is important here because the writer might
|
||||
// trigger an async_write before the async_hello thereby
|
||||
// causing an authentication problem.
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
asio::experimental::make_parallel_group(
|
||||
[this](auto token) {
|
||||
return conn_->handshaker_.async_hello(*conn_, logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->health_checker_.async_ping(*conn_, logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->health_checker_.async_check_timeout(*conn_, logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->reader(logger_, token);
|
||||
},
|
||||
[this](auto token) {
|
||||
return conn_->writer(logger_, token);
|
||||
})
|
||||
.async_wait(asio::experimental::wait_for_one_error(), std::move(self));
|
||||
|
||||
// The receive operation must be cancelled because channel
|
||||
// subscription does not survive a reconnection but requires
|
||||
// re-subscription.
|
||||
conn_->cancel(operation::receive);
|
||||
if (order[0] == 0 && !!ec0) {
|
||||
self.complete(ec0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!conn_->will_reconnect()) {
|
||||
conn_->cancel(operation::reconnection);
|
||||
self.complete(ec3);
|
||||
return;
|
||||
}
|
||||
if (order[0] == 2 && ec2 == error::pong_timeout) {
|
||||
self.complete(ec1);
|
||||
return;
|
||||
}
|
||||
|
||||
conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
|
||||
// The receive operation must be cancelled because channel
|
||||
// subscription does not survive a reconnection but requires
|
||||
// re-subscription.
|
||||
conn_->cancel(operation::receive);
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->reconnect_timer_.async_wait(asio::prepend(std::move(self), order_t{}));
|
||||
if (ec0) {
|
||||
self.complete(ec0);
|
||||
return;
|
||||
}
|
||||
if (!conn_->will_reconnect()) {
|
||||
conn_->cancel(operation::reconnection);
|
||||
self.complete(ec3);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!conn_->will_reconnect()) {
|
||||
self.complete(asio::error::operation_aborted);
|
||||
return;
|
||||
conn_->reconnect_timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->reconnect_timer_.async_wait(asio::prepend(std::move(self), order_t{}));
|
||||
if (ec0) {
|
||||
self.complete(ec0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!conn_->will_reconnect()) {
|
||||
self.complete(asio::error::operation_aborted);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -753,9 +778,7 @@ private:
|
||||
writer_timer_);
|
||||
}
|
||||
|
||||
auto is_open() const noexcept { return stream_.is_open(); }
|
||||
|
||||
[[nodiscard]] bool trigger_write() const noexcept { return is_open() && !mpx_.is_writing(); }
|
||||
bool is_open() const noexcept { return stream_.is_open(); }
|
||||
|
||||
detail::redis_stream<Executor> stream_;
|
||||
|
||||
|
||||
@@ -17,31 +17,56 @@
|
||||
#include <boost/asio/coroutine.hpp>
|
||||
#include <boost/asio/ip/basic_resolver.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/local/stream_protocol.hpp>
|
||||
#include <boost/asio/ssl/context.hpp>
|
||||
#include <boost/asio/ssl/stream.hpp>
|
||||
#include <boost/asio/ssl/stream_base.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include "boost/system/detail/error_code.hpp"
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace boost {
|
||||
namespace redis {
|
||||
namespace detail {
|
||||
|
||||
// What transport is redis_stream using?
|
||||
enum class transport_type
|
||||
{
|
||||
tcp, // plaintext TCP
|
||||
tcp_tls, // TLS over TCP
|
||||
unix_socket, // UNIX domain sockets
|
||||
};
|
||||
|
||||
template <class Executor>
|
||||
class redis_stream {
|
||||
asio::ssl::context ssl_ctx_;
|
||||
asio::ip::basic_resolver<asio::ip::tcp, Executor> resolv_;
|
||||
typename asio::steady_timer::template rebind_executor<Executor>::other timer_;
|
||||
asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>> stream_;
|
||||
bool ssl_stream_used_{};
|
||||
bool use_ssl_{};
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
asio::basic_stream_socket<asio::local::stream_protocol, Executor> unix_socket_;
|
||||
#endif
|
||||
typename asio::steady_timer::template rebind_executor<Executor>::other timer_;
|
||||
|
||||
transport_type transport_{transport_type::tcp};
|
||||
bool ssl_stream_used_{false};
|
||||
|
||||
void reset_stream() { stream_ = {resolv_.get_executor(), ssl_ctx_}; }
|
||||
|
||||
static transport_type transport_from_config(const config& cfg)
|
||||
{
|
||||
if (cfg.unix_socket.empty()) {
|
||||
if (cfg.use_ssl) {
|
||||
return transport_type::tcp_tls;
|
||||
} else {
|
||||
return transport_type::tcp;
|
||||
}
|
||||
} else {
|
||||
BOOST_ASSERT(!cfg.use_ssl);
|
||||
return transport_type::unix_socket;
|
||||
}
|
||||
}
|
||||
|
||||
template <class Logger>
|
||||
struct connect_op {
|
||||
redis_stream& obj;
|
||||
@@ -69,63 +94,83 @@ class redis_stream {
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER(coro)
|
||||
{
|
||||
// ssl::stream doesn't support being re-used. If we're to use
|
||||
// TLS and the stream has been used, re-create it.
|
||||
// Must be done before anything else is done on the stream
|
||||
if (cfg->use_ssl && obj.ssl_stream_used_)
|
||||
obj.reset_stream();
|
||||
// Record the transport that we will be using
|
||||
obj.transport_ = transport_from_config(*cfg);
|
||||
|
||||
// Resolve the server's address
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
obj.resolv_.async_resolve(
|
||||
cfg->addr.host,
|
||||
cfg->addr.port,
|
||||
asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self)));
|
||||
|
||||
// Log it
|
||||
lgr.on_resolve(ec, resolver_results);
|
||||
|
||||
// If this failed, we can't continue
|
||||
if (ec) {
|
||||
self.complete(ec == asio::error::operation_aborted ? error::resolve_timeout : ec);
|
||||
return;
|
||||
}
|
||||
|
||||
// Connect to the address that the resolver provided us
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
asio::async_connect(
|
||||
obj.stream_.next_layer(),
|
||||
std::move(resolver_results),
|
||||
asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
|
||||
|
||||
// Note: logging is performed in the specialized operator() function.
|
||||
// If this failed, we can't continue
|
||||
if (ec) {
|
||||
self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
|
||||
return;
|
||||
}
|
||||
|
||||
if (cfg->use_ssl) {
|
||||
// Mark the SSL stream as used
|
||||
obj.ssl_stream_used_ = true;
|
||||
|
||||
// If we were configured to use TLS, perform the handshake
|
||||
if (obj.transport_ == transport_type::unix_socket) {
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
// Directly connect to the socket
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
obj.stream_.async_handshake(
|
||||
asio::ssl::stream_base::client,
|
||||
asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self)));
|
||||
obj.unix_socket_.async_connect(
|
||||
cfg->unix_socket,
|
||||
asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
|
||||
|
||||
lgr.on_ssl_handshake(ec);
|
||||
// Log it
|
||||
lgr.on_connect(ec, cfg->unix_socket);
|
||||
|
||||
// If this failed, we can't continue
|
||||
if (ec) {
|
||||
self.complete(
|
||||
ec == asio::error::operation_aborted ? error::ssl_handshake_timeout : ec);
|
||||
self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
|
||||
return;
|
||||
}
|
||||
#else
|
||||
BOOST_ASSERT(false);
|
||||
#endif
|
||||
} else {
|
||||
// ssl::stream doesn't support being re-used. If we're to use
|
||||
// TLS and the stream has been used, re-create it.
|
||||
// Must be done before anything else is done on the stream
|
||||
if (cfg->use_ssl && obj.ssl_stream_used_)
|
||||
obj.reset_stream();
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
obj.resolv_.async_resolve(
|
||||
cfg->addr.host,
|
||||
cfg->addr.port,
|
||||
asio::cancel_after(obj.timer_, cfg->resolve_timeout, std::move(self)));
|
||||
|
||||
// Log it
|
||||
lgr.on_resolve(ec, resolver_results);
|
||||
|
||||
// If this failed, we can't continue
|
||||
if (ec) {
|
||||
self.complete(ec == asio::error::operation_aborted ? error::resolve_timeout : ec);
|
||||
return;
|
||||
}
|
||||
|
||||
// Record that we're using SSL
|
||||
obj.use_ssl_ = true;
|
||||
// Connect to the address that the resolver provided us
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
asio::async_connect(
|
||||
obj.stream_.next_layer(),
|
||||
std::move(resolver_results),
|
||||
asio::cancel_after(obj.timer_, cfg->connect_timeout, std::move(self)));
|
||||
|
||||
// Note: logging is performed in the specialized operator() function.
|
||||
// If this failed, we can't continue
|
||||
if (ec) {
|
||||
self.complete(ec == asio::error::operation_aborted ? error::connect_timeout : ec);
|
||||
return;
|
||||
}
|
||||
|
||||
if (cfg->use_ssl) {
|
||||
// Mark the SSL stream as used
|
||||
obj.ssl_stream_used_ = true;
|
||||
|
||||
// If we were configured to use TLS, perform the handshake
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
obj.stream_.async_handshake(
|
||||
asio::ssl::stream_base::client,
|
||||
asio::cancel_after(obj.timer_, cfg->ssl_handshake_timeout, std::move(self)));
|
||||
|
||||
lgr.on_ssl_handshake(ec);
|
||||
|
||||
// If this failed, we can't continue
|
||||
if (ec) {
|
||||
self.complete(
|
||||
ec == asio::error::operation_aborted ? error::ssl_handshake_timeout : ec);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Done
|
||||
@@ -138,8 +183,11 @@ public:
|
||||
explicit redis_stream(Executor ex, asio::ssl::context&& ssl_ctx)
|
||||
: ssl_ctx_{std::move(ssl_ctx)}
|
||||
, resolv_{ex}
|
||||
, timer_{ex}
|
||||
, stream_{std::move(ex), ssl_ctx_}
|
||||
, stream_{ex, ssl_ctx_}
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
, unix_socket_{ex}
|
||||
#endif
|
||||
, timer_{std::move(ex)}
|
||||
{ }
|
||||
|
||||
// Executor. Required to satisfy the AsyncStream concept
|
||||
@@ -148,7 +196,14 @@ public:
|
||||
|
||||
// Accessors
|
||||
const auto& get_ssl_context() const noexcept { return ssl_ctx_; }
|
||||
bool is_open() const { return stream_.next_layer().is_open(); }
|
||||
bool is_open() const
|
||||
{
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
if (transport_ == transport_type::unix_socket)
|
||||
return unix_socket_.is_open();
|
||||
#endif
|
||||
return stream_.next_layer().is_open();
|
||||
}
|
||||
auto& next_layer() { return stream_; }
|
||||
const auto& next_layer() const { return stream_; }
|
||||
|
||||
@@ -161,25 +216,56 @@ public:
|
||||
token);
|
||||
}
|
||||
|
||||
// These functions should only be used with callbacks (e.g. within async_compose function bodies)
|
||||
template <class ConstBufferSequence, class CompletionToken>
|
||||
auto async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token)
|
||||
void async_write_some(const ConstBufferSequence& buffers, CompletionToken&& token)
|
||||
{
|
||||
if (use_ssl_) {
|
||||
return stream_.async_write_some(buffers, std::forward<CompletionToken>(token));
|
||||
} else {
|
||||
return stream_.next_layer().async_write_some(
|
||||
buffers,
|
||||
std::forward<CompletionToken>(token));
|
||||
switch (transport_) {
|
||||
case transport_type::tcp:
|
||||
{
|
||||
stream_.next_layer().async_write_some(buffers, std::forward<CompletionToken>(token));
|
||||
break;
|
||||
}
|
||||
case transport_type::tcp_tls:
|
||||
{
|
||||
stream_.async_write_some(buffers, std::forward<CompletionToken>(token));
|
||||
break;
|
||||
}
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
case transport_type::unix_socket:
|
||||
{
|
||||
unix_socket_.async_write_some(buffers, std::forward<CompletionToken>(token));
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
default: BOOST_ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
template <class MutableBufferSequence, class CompletionToken>
|
||||
auto async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token)
|
||||
void async_read_some(const MutableBufferSequence& buffers, CompletionToken&& token)
|
||||
{
|
||||
if (use_ssl_) {
|
||||
return stream_.async_read_some(buffers, std::forward<CompletionToken>(token));
|
||||
} else {
|
||||
return stream_.next_layer().async_read_some(buffers, std::forward<CompletionToken>(token));
|
||||
switch (transport_) {
|
||||
case transport_type::tcp:
|
||||
{
|
||||
return stream_.next_layer().async_read_some(
|
||||
buffers,
|
||||
std::forward<CompletionToken>(token));
|
||||
break;
|
||||
}
|
||||
case transport_type::tcp_tls:
|
||||
{
|
||||
return stream_.async_read_some(buffers, std::forward<CompletionToken>(token));
|
||||
break;
|
||||
}
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
case transport_type::unix_socket:
|
||||
{
|
||||
unix_socket_.async_read_some(buffers, std::forward<CompletionToken>(token));
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
default: BOOST_ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,10 +274,13 @@ public:
|
||||
|
||||
void close()
|
||||
{
|
||||
if (stream_.next_layer().is_open()) {
|
||||
system::error_code ec;
|
||||
system::error_code ec;
|
||||
if (stream_.next_layer().is_open())
|
||||
stream_.next_layer().close(ec);
|
||||
}
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
if (unix_socket_.is_open())
|
||||
unix_socket_.close(ec);
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -84,6 +84,12 @@ enum class error
|
||||
|
||||
/// Resp3 hello command error
|
||||
resp3_hello,
|
||||
|
||||
/// The configuration specified a UNIX socket address, but UNIX sockets are not supported by the system.
|
||||
unix_sockets_unsupported,
|
||||
|
||||
/// The configuration specified UNIX sockets with SSL, which is not supported.
|
||||
unix_sockets_ssl_unsupported,
|
||||
};
|
||||
|
||||
/** \internal
|
||||
|
||||
@@ -45,7 +45,12 @@ struct error_category_impl : system::error_category {
|
||||
return "Can't receive server push synchronously without blocking.";
|
||||
case error::incompatible_node_depth: return "Incompatible node depth.";
|
||||
case error::resp3_hello: return "RESP3 handshake error (hello command).";
|
||||
default: BOOST_ASSERT(false); return "Boost.Redis error.";
|
||||
case error::unix_sockets_unsupported:
|
||||
return "The configuration specified a UNIX socket address, but UNIX sockets are not "
|
||||
"supported by the system.";
|
||||
case error::unix_sockets_ssl_unsupported:
|
||||
return "The configuration specified UNIX sockets with SSL, which is not supported.";
|
||||
default: BOOST_ASSERT(false); return "Boost.Redis error.";
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <iterator>
|
||||
#include <string_view>
|
||||
|
||||
namespace boost::redis {
|
||||
|
||||
@@ -64,6 +65,25 @@ void logger::on_connect(system::error_code const& ec, asio::ip::tcp::endpoint co
|
||||
std::clog << std::endl;
|
||||
}
|
||||
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
void logger::on_connect(system::error_code const& ec, std::string_view unix_socket_ep)
|
||||
{
|
||||
if (level_ < level::info)
|
||||
return;
|
||||
|
||||
write_prefix();
|
||||
|
||||
std::clog << "connected to ";
|
||||
|
||||
if (ec)
|
||||
std::clog << ec.message() << std::endl;
|
||||
else
|
||||
std::clog << unix_socket_ep;
|
||||
|
||||
std::clog << std::endl;
|
||||
}
|
||||
#endif
|
||||
|
||||
void logger::on_ssl_handshake(system::error_code const& ec)
|
||||
{
|
||||
if (level_ < level::info)
|
||||
@@ -142,4 +162,14 @@ void logger::trace(std::string_view op, system::error_code const& ec)
|
||||
std::clog << op << ": " << ec.message() << std::endl;
|
||||
}
|
||||
|
||||
void logger::log_error(std::string_view op, system::error_code const& ec)
|
||||
{
|
||||
if (level_ < level::err)
|
||||
return;
|
||||
|
||||
write_prefix();
|
||||
|
||||
std::clog << op << ": " << ec.message() << std::endl;
|
||||
}
|
||||
|
||||
} // namespace boost::redis
|
||||
|
||||
@@ -88,6 +88,9 @@ public:
|
||||
* @param ep Endpoint to which the connection connected.
|
||||
*/
|
||||
void on_connect(system::error_code const& ec, asio::ip::tcp::endpoint const& ep);
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
void on_connect(system::error_code const& ec, std::string_view unix_socket_ep);
|
||||
#endif
|
||||
|
||||
/** @brief Called when the ssl handshake operation completes.
|
||||
* @ingroup high-level-api
|
||||
@@ -129,6 +132,7 @@ public:
|
||||
|
||||
void trace(std::string_view message);
|
||||
void trace(std::string_view op, system::error_code const& ec);
|
||||
void log_error(std::string_view op, system::error_code const& ec);
|
||||
|
||||
private:
|
||||
void write_prefix();
|
||||
|
||||
@@ -55,6 +55,7 @@ make_test(test_conn_echo_stress)
|
||||
make_test(test_issue_50)
|
||||
make_test(test_issue_181)
|
||||
make_test(test_conversions)
|
||||
make_test(test_unix_sockets)
|
||||
|
||||
# Coverage
|
||||
set(
|
||||
|
||||
@@ -147,7 +147,8 @@ BOOST_AUTO_TEST_CASE(reconnection)
|
||||
net::io_context ioc;
|
||||
net::steady_timer timer{ioc};
|
||||
connection conn{ioc};
|
||||
auto const cfg = make_tls_config();
|
||||
auto cfg = make_tls_config();
|
||||
cfg.reconnect_wait_interval = 10ms; // make the test run faster
|
||||
|
||||
request ping_request;
|
||||
ping_request.push("PING", "some_value");
|
||||
|
||||
255
test/test_unix_sockets.cpp
Normal file
255
test/test_unix_sockets.cpp
Normal file
@@ -0,0 +1,255 @@
|
||||
//
|
||||
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
|
||||
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
|
||||
//
|
||||
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
||||
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
||||
//
|
||||
|
||||
#include <boost/redis/connection.hpp>
|
||||
#include <boost/redis/request.hpp>
|
||||
#include <boost/redis/response.hpp>
|
||||
|
||||
#include <boost/asio/error.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/local/basic_endpoint.hpp>
|
||||
#include <boost/core/lightweight_test.hpp>
|
||||
#include <boost/system/error_code.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
#include <cstddef>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
using boost::system::error_code;
|
||||
namespace net = boost::asio;
|
||||
using namespace boost::redis;
|
||||
using namespace std::chrono_literals;
|
||||
using namespace std::string_view_literals;
|
||||
|
||||
namespace {
|
||||
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
|
||||
constexpr std::string_view unix_socket_path = "/tmp/redis-socks/redis.sock";
|
||||
|
||||
// Executing commands using UNIX sockets works
|
||||
void test_exec()
|
||||
{
|
||||
// Setup
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
auto cfg = make_test_config();
|
||||
cfg.unix_socket = unix_socket_path;
|
||||
bool run_finished = false, exec_finished = false;
|
||||
|
||||
// Run the connection
|
||||
conn.async_run(cfg, {}, [&run_finished](error_code ec) {
|
||||
run_finished = true;
|
||||
BOOST_TEST_EQ(ec, net::error::operation_aborted);
|
||||
});
|
||||
|
||||
// Execute a request
|
||||
request req;
|
||||
req.push("PING", "unix");
|
||||
response<std::string> res;
|
||||
conn.async_exec(req, res, [&](error_code ec, std::size_t) {
|
||||
exec_finished = true;
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
conn.cancel();
|
||||
});
|
||||
|
||||
// Run
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
// Check
|
||||
BOOST_TEST(exec_finished);
|
||||
BOOST_TEST(run_finished);
|
||||
BOOST_TEST_EQ(std::get<0>(res).value(), "unix"sv);
|
||||
}
|
||||
|
||||
// If the connection is lost when using a UNIX socket, we can reconnect
|
||||
void test_reconnection()
|
||||
{
|
||||
// Setup
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
auto cfg = make_test_config();
|
||||
cfg.unix_socket = unix_socket_path;
|
||||
cfg.reconnect_wait_interval = 10ms; // make the test run faster
|
||||
|
||||
request ping_request;
|
||||
ping_request.push("PING", "some_value");
|
||||
|
||||
request quit_request;
|
||||
quit_request.push("QUIT");
|
||||
|
||||
bool exec_finished = false, run_finished = false;
|
||||
|
||||
// Run the connection
|
||||
conn.async_run(cfg, {}, [&](error_code ec) {
|
||||
run_finished = true;
|
||||
BOOST_TEST(ec == net::error::operation_aborted);
|
||||
});
|
||||
|
||||
// The PING is the end of the callback chain
|
||||
auto ping_callback = [&](error_code ec, std::size_t) {
|
||||
exec_finished = true;
|
||||
BOOST_TEST(ec == error_code());
|
||||
conn.cancel();
|
||||
};
|
||||
|
||||
auto quit_callback = [&](error_code ec, std::size_t) {
|
||||
BOOST_TEST(ec == error_code());
|
||||
|
||||
// If a request is issued immediately after QUIT, the request sometimes
|
||||
// fails, probably due to a race condition. This dispatches any pending
|
||||
// handlers, triggering the reconnection process.
|
||||
// TODO: this should not be required.
|
||||
ioc.poll();
|
||||
conn.async_exec(ping_request, ignore, ping_callback);
|
||||
};
|
||||
|
||||
conn.async_exec(quit_request, ignore, quit_callback);
|
||||
|
||||
ioc.run_for(test_timeout);
|
||||
|
||||
BOOST_TEST(exec_finished);
|
||||
BOOST_TEST(run_finished);
|
||||
}
|
||||
|
||||
// We can freely switch between UNIX sockets and other transports
|
||||
void test_switch_between_transports()
|
||||
{
|
||||
// Setup
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
request req;
|
||||
response<std::string> res1, res2, res3;
|
||||
req.push("PING", "hello");
|
||||
bool finished = false;
|
||||
|
||||
// Create configurations for TLS and UNIX connections
|
||||
auto tcp_tls_cfg = make_test_config();
|
||||
tcp_tls_cfg.use_ssl = true;
|
||||
tcp_tls_cfg.addr.port = "6380";
|
||||
auto unix_cfg = make_test_config();
|
||||
unix_cfg.unix_socket = unix_socket_path;
|
||||
|
||||
// After the last TCP/TLS run, exit
|
||||
auto on_run_tls_2 = [&](error_code ec) {
|
||||
finished = true;
|
||||
std::cout << "Run (TCP/TLS 2) finished\n";
|
||||
BOOST_TEST_EQ(ec, net::error::operation_aborted);
|
||||
};
|
||||
|
||||
// After UNIX sockets, switch back to TCP/tLS
|
||||
auto on_run_unix = [&](error_code ec) {
|
||||
std::cout << "Run (UNIX) finished\n";
|
||||
BOOST_TEST_EQ(ec, net::error::operation_aborted);
|
||||
|
||||
// Change to using TCP with TLS again
|
||||
conn.async_run(unix_cfg, {}, on_run_tls_2);
|
||||
conn.async_exec(req, res3, [&](error_code ec, std::size_t) {
|
||||
std::cout << "Exec 3 finished\n";
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(std::get<0>(res3).value(), "hello");
|
||||
conn.cancel();
|
||||
});
|
||||
};
|
||||
|
||||
// After TCP/TLS, change to UNIX sockets
|
||||
auto on_run_tls_1 = [&](error_code ec) {
|
||||
std::cout << "Run (TCP/TLS 1) finished\n";
|
||||
BOOST_TEST_EQ(ec, net::error::operation_aborted);
|
||||
|
||||
conn.async_run(unix_cfg, {}, on_run_unix);
|
||||
conn.async_exec(req, res2, [&](error_code ec, std::size_t) {
|
||||
std::cout << "Exec 2 finished\n";
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(std::get<0>(res2).value(), "hello");
|
||||
conn.cancel();
|
||||
});
|
||||
};
|
||||
|
||||
// Start with TCP/TLS
|
||||
conn.async_run(tcp_tls_cfg, {}, on_run_tls_1);
|
||||
conn.async_exec(req, res1, [&](error_code ec, std::size_t) {
|
||||
std::cout << "Exec 1 finished\n";
|
||||
BOOST_TEST_EQ(ec, error_code());
|
||||
BOOST_TEST_EQ(std::get<0>(res1).value(), "hello");
|
||||
conn.cancel();
|
||||
});
|
||||
|
||||
// Run the test
|
||||
ioc.run_for(test_timeout);
|
||||
BOOST_TEST(finished);
|
||||
}
|
||||
|
||||
// Trying to enable TLS and UNIX sockets at the same time
|
||||
// is an error and makes async_run exit immediately
|
||||
void test_error_unix_tls()
|
||||
{
|
||||
// Setup
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
auto cfg = make_test_config();
|
||||
cfg.use_ssl = true;
|
||||
cfg.addr.port = "6380";
|
||||
cfg.unix_socket = unix_socket_path;
|
||||
bool finished = false;
|
||||
|
||||
// Run the connection
|
||||
conn.async_run(cfg, {}, [&finished](error_code ec) {
|
||||
BOOST_TEST_EQ(ec, error::unix_sockets_ssl_unsupported);
|
||||
finished = true;
|
||||
});
|
||||
|
||||
// Run the test
|
||||
ioc.run_for(test_timeout);
|
||||
BOOST_TEST(finished);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
// Trying to enable TLS and UNIX sockets at the same time
|
||||
// is an error and makes async_run exit immediately
|
||||
void test_unix_not_supported()
|
||||
{
|
||||
// Setup
|
||||
net::io_context ioc;
|
||||
connection conn{ioc};
|
||||
auto cfg = make_test_config();
|
||||
cfg.unix_socket = "/some/path.sock";
|
||||
bool finished = false;
|
||||
|
||||
// Run the connection
|
||||
conn.async_run(cfg, {}, [&finished](error_code ec) {
|
||||
BOOST_TEST_EQ(ec, error::unix_sockets_unsupported);
|
||||
finished = true;
|
||||
});
|
||||
|
||||
// Run the test
|
||||
ioc.run_for(test_timeout);
|
||||
BOOST_TEST(finished);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
} // namespace
|
||||
|
||||
int main()
|
||||
{
|
||||
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
|
||||
test_exec();
|
||||
test_reconnection();
|
||||
test_switch_between_transports();
|
||||
test_error_unix_tls();
|
||||
#else
|
||||
test_unix_not_supported();
|
||||
#endif
|
||||
|
||||
return boost::report_errors();
|
||||
}
|
||||
@@ -1,16 +1,10 @@
|
||||
services:
|
||||
redis:
|
||||
image: "redis:alpine"
|
||||
command: [
|
||||
"redis-server",
|
||||
"--tls-port", "6380",
|
||||
"--tls-cert-file", "/tls/server.crt",
|
||||
"--tls-key-file", "/tls/server.key",
|
||||
"--tls-ca-cert-file", "/tls/ca.crt",
|
||||
"--tls-auth-clients", "no",
|
||||
]
|
||||
entrypoint: "/docker/entrypoint.sh"
|
||||
volumes:
|
||||
- ./tls:/tls
|
||||
- ./docker:/docker
|
||||
- /tmp/redis-socks:/tmp/redis-socks
|
||||
ports:
|
||||
- 6379:6379
|
||||
- 6380:6380
|
||||
@@ -22,3 +16,4 @@ services:
|
||||
- BOOST_REDIS_TEST_SERVER=redis
|
||||
volumes:
|
||||
- ../:/boost-redis
|
||||
- /tmp/redis-socks:/tmp/redis-socks
|
||||
|
||||
16
tools/docker/entrypoint.sh
Executable file
16
tools/docker/entrypoint.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/bin/sh
|
||||
# The Redis container entrypoint. Runs the server with the required
|
||||
# flags and makes the socket accessible
|
||||
|
||||
set -e
|
||||
|
||||
chmod 777 /tmp/redis-socks
|
||||
|
||||
redis-server \
|
||||
--tls-port 6380 \
|
||||
--tls-cert-file /docker/tls/server.crt \
|
||||
--tls-key-file /docker/tls/server.key \
|
||||
--tls-ca-cert-file /docker/tls/ca.crt \
|
||||
--tls-auth-clients no \
|
||||
--unixsocket /tmp/redis-socks/redis.sock \
|
||||
--unixsocketperm 777
|
||||
Reference in New Issue
Block a user