2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Merge pull request #248 from boostorg/refactoring_clean_code

Sans-io multiplexing
This commit is contained in:
Marcelo
2025-05-10 20:52:10 +02:00
committed by GitHub
12 changed files with 787 additions and 460 deletions

View File

@@ -18,14 +18,6 @@
namespace boost::redis {
namespace detail {
// Forward decl
template <class Executor>
class basic_connection;
}
/** @brief A type-erased reference to a response.
* @ingroup high-level-api
*
@@ -39,9 +31,9 @@ class basic_connection;
* co_await conn.async_exec(req, any_response(resp));
* ```
*/
class any_adapter
{
using fn_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
class any_adapter {
public:
using fn_type = std::function<void(std::size_t, resp3::node_view const&, system::error_code&)>;
struct impl_t {
fn_type adapt_fn;
@@ -60,7 +52,6 @@ class any_adapter
template <class Executor>
friend class basic_connection;
public:
/**
* @brief Constructor.
*

View File

@@ -10,6 +10,7 @@
#include <string>
#include <chrono>
#include <optional>
#include <limits>
namespace boost::redis
{
@@ -78,6 +79,13 @@ struct config {
* To disable reconnection pass zero as duration.
*/
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};
/** @brief Maximum size of a socket read.
*
* Sets a limit on how much data is allowed to be read into the
* read buffer. It can be used to prevent DDOS.
*/
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
};
} // boost::redis

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -13,6 +13,7 @@
#include <boost/redis/detail/connector.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/resolver.hpp>
#include <boost/redis/detail/resp3_handshaker.hpp>
#include <boost/redis/error.hpp>
@@ -44,28 +45,17 @@
#include <boost/core/ignore_unused.hpp>
#include <algorithm>
#include <array>
#include <chrono>
#include <cstddef>
#include <deque>
#include <functional>
#include <limits>
#include <memory>
#include <string_view>
#include <utility>
namespace boost::redis {
namespace detail
{
template <class DynamicBuffer>
std::string_view buffer_view(DynamicBuffer buf) noexcept
{
char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
return std::string_view{start, std::size(buf)};
}
template <class AsyncReadStream, class DynamicBuffer>
class append_some_op {
private:
@@ -119,23 +109,28 @@ async_append_some(
>(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
}
template <class Executor>
using exec_notifier_type =
asio::experimental::channel<Executor, void(system::error_code, std::size_t)>;
template <class Conn>
struct exec_op {
using req_info_type = typename Conn::req_info;
using adapter_type = typename Conn::adapter_type;
using req_info_type = typename multiplexer::elem;
using executor_type = typename Conn::executor_type;
Conn* conn_ = nullptr;
std::shared_ptr<exec_notifier_type<executor_type>> notifier_ = nullptr;
std::shared_ptr<req_info_type> info_ = nullptr;
asio::coroutine coro{};
asio::coroutine coro_{};
template <class Self>
void operator()(Self& self , system::error_code = {}, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER (coro)
BOOST_ASIO_CORO_REENTER (coro_)
{
// Check whether the user wants to wait for the connection to
// be stablished.
if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
if (info_->get_request().get_config().cancel_if_not_connected && !conn_->is_open()) {
BOOST_ASIO_CORO_YIELD
asio::dispatch(
asio::get_associated_immediate_executor(self, self.get_io_executor()),
@@ -143,25 +138,22 @@ struct exec_op {
return self.complete(error::not_connected, 0);
}
conn_->add_request_info(info_);
conn_->mpx_.add(info_);
if (conn_->trigger_write()) {
conn_->writer_timer_.cancel();
}
EXEC_OP_WAIT:
BOOST_ASIO_CORO_YIELD
info_->async_wait(std::move(self));
notifier_->async_receive(std::move(self));
if (info_->ec_) {
self.complete(info_->ec_, 0);
if (info_->get_error()) {
self.complete(info_->get_error(), 0);
return;
}
if (info_->stop_requested()) {
// Don't have to call remove_request as it has already
// been by cancel(exec).
return self.complete(asio::error::operation_aborted, 0);
}
if (is_cancelled(self)) {
if (!info_->is_waiting()) {
if (!conn_->mpx_.remove(info_)) {
using c_t = asio::cancellation_type;
auto const c = self.get_cancellation_state().cancelled();
if ((c & c_t::terminal) != c_t::none) {
@@ -178,14 +170,13 @@ EXEC_OP_WAIT:
goto EXEC_OP_WAIT;
}
} else {
// Cancelation can be honored.
conn_->remove_request(info_);
// Cancelation honored.
self.complete(asio::error::operation_aborted, 0);
return;
}
}
self.complete(info_->ec_, info_->read_size_);
self.complete(info_->get_error(), info_->get_read_size());
}
}
};
@@ -205,13 +196,22 @@ struct writer_op {
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
while (conn_->coalesce_requests()) {
if (conn_->use_ssl())
BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
else
BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
while (conn_->mpx_.prepare_write() != 0) {
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
asio::async_write(
conn_->next_layer(),
asio::buffer(conn_->mpx_.get_write_buffer()),
std::move(self));
} else {
BOOST_ASIO_CORO_YIELD
asio::async_write(
conn_->next_layer().next_layer(),
asio::buffer(conn_->mpx_.get_write_buffer()),
std::move(self));
}
logger_.on_write(ec, conn_->write_buffer_);
logger_.on_write(ec, conn_->mpx_.get_write_buffer());
if (ec) {
logger_.trace("writer_op (1)", ec);
@@ -220,7 +220,7 @@ struct writer_op {
return;
}
conn_->on_write();
conn_->mpx_.commit_write();
// A socket.close() may have been called while a
// successful write might had already been queued, so we
@@ -248,11 +248,14 @@ struct writer_op {
template <class Conn, class Logger>
struct reader_op {
using parse_result = typename Conn::parse_result;
using parse_ret_type = typename Conn::parse_ret_type;
using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
// TODO: Move this to config so the user can fine tune?
static constexpr std::size_t buffer_growth_hint = 4096;
Conn* conn_;
Logger logger_;
parse_ret_type res_{parse_result::resp, 0};
std::pair<tribool, std::size_t> res_{std::make_pair(std::make_optional(false), 0)};
asio::coroutine coro{};
template <class Self>
@@ -265,20 +268,20 @@ struct reader_op {
BOOST_ASIO_CORO_REENTER (coro) for (;;)
{
// Appends some data to the buffer if necessary.
if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
if (!res_.first.has_value() || conn_->mpx_.is_data_needed()) {
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
async_append_some(
conn_->next_layer(),
conn_->dbuf_,
conn_->get_suggested_buffer_growth(),
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
} else {
BOOST_ASIO_CORO_YIELD
async_append_some(
conn_->next_layer().next_layer(),
conn_->dbuf_,
conn_->get_suggested_buffer_growth(),
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
}
@@ -302,7 +305,7 @@ struct reader_op {
}
}
res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
res_ = conn_->mpx_.commit_read(ec);
if (ec) {
logger_.trace("reader_op (3)", ec);
conn_->cancel(operation::run);
@@ -310,7 +313,7 @@ struct reader_op {
return;
}
if (res_.first == parse_result::push) {
if (res_.first.has_value() && res_.first.value()) {
if (!conn_->receive_channel_.try_send(ec, res_.second)) {
BOOST_ASIO_CORO_YIELD
conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
@@ -404,7 +407,7 @@ public:
}
}
conn_->reset();
conn_->mpx_.reset();
// Note: Order is important here because the writer might
// trigger an async_write before the async_hello thereby
@@ -501,21 +504,17 @@ public:
*
* @param ex Executor on which connection operation will run.
* @param ctx SSL context.
* @param max_read_size Maximum read size that is passed to
* the internal `asio::dynamic_buffer` constructor.
*/
explicit
basic_connection(
executor_type ex,
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client})
: ctx_{std::move(ctx)}
, stream_{std::make_unique<next_layer_type>(ex, ctx_)}
, writer_timer_{ex}
, receive_channel_{ex, 256}
, resv_{ex}
, health_checker_{ex}
, dbuf_{read_buffer_, max_read_size}
{
set_receive_response(ignore);
writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
@@ -525,9 +524,8 @@ public:
explicit
basic_connection(
asio::io_context& ioc,
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
: basic_connection(ioc.get_executor(), std::move(ctx), max_read_size)
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client})
: basic_connection(ioc.get_executor(), std::move(ctx))
{ }
/** @brief Starts underlying connection operations.
@@ -697,12 +695,17 @@ public:
auto& adapter_impl = adapter.impl_;
BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes.");
auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn), get_executor());
auto notifier = std::make_shared<detail::exec_notifier_type<executor_type>>(get_executor(), 1);
auto info = detail::make_elem(req, std::move(adapter_impl.adapt_fn));
info->set_done_callback([notifier]() {
notifier->try_send(std::error_code{}, 0);
});
return asio::async_compose
< CompletionToken
, void(system::error_code, std::size_t)
>(detail::exec_op<this_type>{this, info}, token, writer_timer_);
>(detail::exec_op<this_type>{this, notifier, info}, token, writer_timer_);
}
/** @brief Cancel operations.
@@ -723,7 +726,7 @@ public:
resv_.cancel();
break;
case operation::exec:
cancel_unwritten_requests();
mpx_.cancel_waiting();
break;
case operation::reconnection:
cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
@@ -743,14 +746,14 @@ public:
health_checker_.cancel();
cancel_run(); // run
receive_channel_.cancel(); // receive
cancel_unwritten_requests(); // exec
mpx_.cancel_waiting(); // exec
break;
default: /* ignore */;
}
}
auto run_is_canceled() const noexcept
{ return cancel_run_called_; }
{ return mpx_.get_cancel_run_state(); }
/// Returns true if the connection was canceled.
bool will_reconnect() const noexcept
@@ -771,18 +774,15 @@ public:
/// Returns a const reference to the next layer.
auto const& next_layer() const noexcept
{ return *stream_; }
/// Sets the response object of `async_receive` operations.
template <class Response>
void set_receive_response(Response& response)
{
using namespace boost::redis::adapter;
auto g = boost_redis_adapt(response);
receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
}
{ mpx_.set_receive_response(response); }
/// Returns connection usage information.
usage get_usage() const noexcept
{ return usage_; }
{ return mpx_.get_usage(); }
private:
using clock_type = std::chrono::steady_clock;
@@ -793,220 +793,23 @@ private:
using resolver_type = detail::resolver<Executor>;
using health_checker_type = detail::health_checker<Executor>;
using resp3_handshaker_type = detail::resp3_handshaker<executor_type>;
using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
using exec_notifier_type = receive_channel_type;
auto use_ssl() const noexcept
{ return cfg_.use_ssl;}
auto cancel_on_conn_lost() -> std::size_t
{
// Must return false if the request should be removed.
auto cond = [](auto const& ptr)
{
BOOST_ASSERT(ptr != nullptr);
if (ptr->is_waiting()) {
return !ptr->req_->get_config().cancel_on_connection_lost;
} else {
return !ptr->req_->get_config().cancel_if_unresponded;
}
};
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
auto const ret = std::distance(point, std::end(reqs_));
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->stop();
});
reqs_.erase(point, std::end(reqs_));
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return ptr->mark_waiting();
});
return ret;
}
auto cancel_unwritten_requests() -> std::size_t
{
auto f = [](auto const& ptr)
{
BOOST_ASSERT(ptr != nullptr);
return !ptr->is_waiting();
};
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
auto const ret = std::distance(point, std::end(reqs_));
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->stop();
});
reqs_.erase(point, std::end(reqs_));
return ret;
}
void cancel_run()
{
// Protects the code below from being called more than
// once, see https://github.com/boostorg/redis/issues/181
if (std::exchange(cancel_run_called_, true)) {
return;
}
close();
writer_timer_.cancel();
receive_channel_.cancel();
cancel_on_conn_lost();
mpx_.cancel_on_conn_lost();
}
void on_write()
{
// We have to clear the payload right after writing it to use it
// as a flag that informs there is no ongoing write.
write_buffer_.clear();
// Notice this must come before the for-each below.
cancel_push_requests();
// There is small optimization possible here: traverse only the
// partition of unwritten requests instead of them all.
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
if (ptr->is_staged()) {
ptr->mark_written();
}
});
}
struct req_info {
public:
using node_type = resp3::basic_node<std::string_view>;
using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
explicit req_info(request const& req, adapter_type adapter, executor_type ex)
: notifier_{ex, 1}
, req_{&req}
, adapter_{}
, expected_responses_{req.get_expected_responses()}
, status_{status::waiting}
, ec_{{}}
, read_size_{0}
{
adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
{
auto const i = req_->get_expected_responses() - expected_responses_;
adapter(i, nd, ec);
};
}
auto proceed()
{
notifier_.try_send(std::error_code{}, 0);
}
void stop()
{
notifier_.close();
}
[[nodiscard]] auto is_waiting() const noexcept
{ return status_ == status::waiting; }
[[nodiscard]] auto is_written() const noexcept
{ return status_ == status::written; }
[[nodiscard]] auto is_staged() const noexcept
{ return status_ == status::staged; }
void mark_written() noexcept
{ status_ = status::written; }
void mark_staged() noexcept
{ status_ = status::staged; }
void mark_waiting() noexcept
{ status_ = status::waiting; }
[[nodiscard]] auto stop_requested() const noexcept
{ return !notifier_.is_open();}
template <class CompletionToken>
auto async_wait(CompletionToken&& token)
{
return notifier_.async_receive(std::forward<CompletionToken>(token));
}
//private:
enum class status
{ waiting
, staged
, written
};
exec_notifier_type notifier_;
request const* req_;
wrapped_adapter_type adapter_;
// Contains the number of commands that haven't been read yet.
std::size_t expected_responses_;
status status_;
system::error_code ec_;
std::size_t read_size_;
};
void remove_request(std::shared_ptr<req_info> const& info)
{
reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
}
using reqs_type = std::deque<std::shared_ptr<req_info>>;
template <class, class> friend struct detail::reader_op;
template <class, class> friend struct detail::writer_op;
template <class> friend struct detail::exec_op;
template <class, class> friend class detail::run_op;
void cancel_push_requests()
{
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
});
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->proceed();
});
reqs_.erase(point, std::end(reqs_));
}
[[nodiscard]] bool is_writing() const noexcept
{
return !write_buffer_.empty();
}
void add_request_info(std::shared_ptr<req_info> const& info)
{
reqs_.push_back(info);
if (info->req_->has_hello_priority()) {
auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
return e->is_waiting();
});
std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
}
if (is_open() && !is_writing())
writer_timer_.cancel();
}
template <class CompletionToken, class Logger>
auto reader(Logger l, CompletionToken&& token)
{
@@ -1026,38 +829,6 @@ private:
>(detail::writer_op<this_type, Logger>{this, l}, std::forward<CompletionToken>(token), writer_timer_);
}
[[nodiscard]] bool coalesce_requests()
{
// Coalesces the requests and marks them staged. After a
// successful write staged requests will be marked as written.
auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
return !ri->is_waiting();
});
std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
// Stage the request.
write_buffer_ += ri->req_->payload();
ri->mark_staged();
usage_.commands_sent += ri->expected_responses_;
});
usage_.bytes_sent += std::size(write_buffer_);
return point != std::cend(reqs_);
}
bool is_waiting_response() const noexcept
{
if (std::empty(reqs_))
return false;
// Under load and on low-latency networks we might start
// receiving responses before the write operation completed and
// the request is still maked as staged and not written. See
// https://github.com/boostorg/redis/issues/170
return !reqs_.front()->is_waiting();
}
void close()
{
if (stream_->next_layer().is_open()) {
@@ -1069,126 +840,9 @@ private:
auto is_open() const noexcept { return stream_->next_layer().is_open(); }
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
auto is_next_push()
[[nodiscard]] bool trigger_write() const noexcept
{
BOOST_ASSERT(!read_buffer_.empty());
// Useful links to understand the heuristics below.
//
// - https://github.com/redis/redis/issues/11784
// - https://github.com/redis/redis/issues/6426
// - https://github.com/boostorg/redis/issues/170
// The message's resp3 type is a push.
if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
return true;
// This is non-push type and the requests queue is empty. I have
// noticed this is possible, for example with -MISCONF. I don't
// know why they are not sent with a push type so we can
// distinguish them from responses to commands. If we are lucky
// enough to receive them when the command queue is empty they
// can be treated as server pushes, otherwise it is impossible
// to handle them properly
if (reqs_.empty())
return true;
// The request does not expect any response but we got one. This
// may happen if for example, subscribe with wrong syntax.
if (reqs_.front()->expected_responses_ == 0)
return true;
// Added to deal with MONITOR and also to fix PR170 which
// happens under load and on low-latency networks, where we
// might start receiving responses before the write operation
// completed and the request is still maked as staged and not
// written.
return reqs_.front()->is_waiting();
}
auto get_suggested_buffer_growth() const noexcept
{
return parser_.get_suggested_buffer_growth(4096);
}
enum class parse_result { needs_more, push, resp };
using parse_ret_type = std::pair<parse_result, std::size_t>;
parse_ret_type on_finish_parsing(parse_result t)
{
if (t == parse_result::push) {
usage_.pushes_received += 1;
usage_.push_bytes_received += parser_.get_consumed();
} else {
usage_.responses_received += 1;
usage_.response_bytes_received += parser_.get_consumed();
}
on_push_ = false;
dbuf_.consume(parser_.get_consumed());
auto const res = std::make_pair(t, parser_.get_consumed());
parser_.reset();
return res;
}
parse_ret_type on_read(std::string_view data, system::error_code& ec)
{
// We arrive here in two states:
//
// 1. While we are parsing a message. In this case we
// don't want to determine the type of the message in the
// buffer (i.e. response vs push) but leave it untouched
// until the parsing of a complete message ends.
//
// 2. On a new message, in which case we have to determine
// whether the next message is a push or a response.
//
if (!on_push_) // Prepare for new message.
on_push_ = is_next_push();
if (on_push_) {
if (!resp3::parse(parser_, data, receive_adapter_, ec))
return std::make_pair(parse_result::needs_more, 0);
if (ec)
return std::make_pair(parse_result::push, 0);
return on_finish_parsing(parse_result::push);
}
BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
BOOST_ASSERT(!reqs_.empty());
BOOST_ASSERT(reqs_.front() != nullptr);
BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
return std::make_pair(parse_result::needs_more, 0);
if (ec) {
reqs_.front()->ec_ = ec;
reqs_.front()->proceed();
return std::make_pair(parse_result::resp, 0);
}
reqs_.front()->read_size_ += parser_.get_consumed();
if (--reqs_.front()->expected_responses_ == 0) {
// Done with this request.
reqs_.front()->proceed();
reqs_.pop_front();
}
return on_finish_parsing(parse_result::resp);
}
void reset()
{
write_buffer_.clear();
read_buffer_.clear();
parser_.reset();
on_push_ = false;
cancel_run_called_ = false;
return is_open() && !mpx_.is_writing();
}
asio::ssl::context ctx_;
@@ -1203,20 +857,9 @@ private:
detail::connector ctor_;
health_checker_type health_checker_;
resp3_handshaker_type handshaker_;
receiver_adapter_type receive_adapter_;
using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
config cfg_;
std::string read_buffer_;
dyn_buffer_type dbuf_;
std::string write_buffer_;
reqs_type reqs_;
resp3::parser parser_{};
bool on_push_ = false;
bool cancel_run_called_ = false;
usage usage_;
detail::multiplexer mpx_;
};
/** \brief A basic_connection that type erases the executor.
@@ -1237,15 +880,13 @@ public:
explicit
connection(
executor_type ex,
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client});
/// Contructs from a context.
explicit
connection(
asio::io_context& ioc,
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client},
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client});
/// Returns the underlying executor.
executor_type get_executor() noexcept

View File

@@ -0,0 +1,200 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_MULTIPLEXER_HPP
#define BOOST_REDIS_MULTIPLEXER_HPP
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/usage.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <algorithm>
#include <deque>
#include <functional>
#include <memory>
#include <string_view>
#include <utility>
#include <optional>
namespace boost::redis
{
class request;
namespace detail {
using tribool = std::optional<bool>;
struct multiplexer {
using adapter_type = std::function<void(resp3::node_view const&, system::error_code&)>;
using pipeline_adapter_type = std::function<void(std::size_t, resp3::node_view const&, system::error_code&)>;
struct elem {
public:
explicit elem(request const& req, pipeline_adapter_type adapter);
void set_done_callback(std::function<void()> f) noexcept
{ done_ = std::move(f); };
auto notify_done() noexcept
{ done_(); }
auto notify_error(system::error_code ec) noexcept -> void;
[[nodiscard]]
auto is_waiting() const noexcept
{ return status_ == status::waiting; }
[[nodiscard]]
auto is_written() const noexcept
{ return status_ == status::written; }
[[nodiscard]]
auto is_staged() const noexcept
{ return status_ == status::staged; }
void mark_written() noexcept
{ status_ = status::written; }
void mark_staged() noexcept
{ status_ = status::staged; }
void mark_waiting() noexcept
{ status_ = status::waiting; }
auto get_error() const -> system::error_code const&
{ return ec_; }
auto get_request() const -> request const&
{ return *req_; }
auto get_read_size() const -> std::size_t
{ return read_size_; }
auto get_remaining_responses() const -> std::size_t
{ return remaining_responses_; }
auto commit_response(std::size_t read_size) -> void;
auto get_adapter() -> adapter_type&
{ return adapter_; }
private:
enum class status
{ waiting
, staged
, written
};
request const* req_;
adapter_type adapter_;
std::function<void()> done_;
// Contains the number of commands that haven't been read yet.
std::size_t remaining_responses_;
status status_;
system::error_code ec_;
std::size_t read_size_;
};
auto remove(std::shared_ptr<elem> const& ptr) -> bool;
[[nodiscard]]
auto prepare_write() -> std::size_t;
// Returns the number of requests that have been released because
// they don't have a response e.g. SUBSCRIBE.
auto commit_write() -> std::size_t;
[[nodiscard]]
auto commit_read(system::error_code& ec) -> std::pair<tribool, std::size_t>;
auto add(std::shared_ptr<elem> const& ptr) -> void;
auto reset() -> void;
[[nodiscard]]
auto const& get_parser() const noexcept
{ return parser_; }
//[[nodiscard]]
auto cancel_waiting() -> std::size_t;
//[[nodiscard]]
auto cancel_on_conn_lost() -> std::size_t;
[[nodiscard]]
auto get_cancel_run_state() const noexcept -> bool
{ return cancel_run_called_; }
[[nodiscard]]
auto get_write_buffer() noexcept -> std::string_view
{ return std::string_view{write_buffer_}; }
[[nodiscard]]
auto get_read_buffer() noexcept -> std::string&
{ return read_buffer_; }
[[nodiscard]]
auto is_data_needed() const noexcept -> bool
{ return std::empty(read_buffer_); }
// TODO: Change signature to receive an adapter instead of a
// response.
template <class Response>
void set_receive_response(Response& response)
{
using namespace boost::redis::adapter;
auto g = boost_redis_adapt(response);
receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
}
[[nodiscard]]
auto get_usage() const noexcept -> usage
{ return usage_;}
[[nodiscard]]
auto is_writing() const noexcept -> bool;
private:
[[nodiscard]]
auto is_waiting_response() const noexcept -> bool;
[[nodiscard]]
auto on_finish_parsing(bool is_push) -> std::size_t;
[[nodiscard]]
auto is_next_push() const noexcept -> bool;
// Releases the number of requests that have been released.
[[nodiscard]]
auto release_push_requests() -> std::size_t;
std::string read_buffer_;
std::string write_buffer_;
std::deque<std::shared_ptr<elem>> reqs_;
resp3::parser parser_{};
bool on_push_ = false;
bool cancel_run_called_ = false;
usage usage_;
adapter_type receive_adapter_;
};
auto
make_elem(
request const& req,
multiplexer::pipeline_adapter_type adapter) -> std::shared_ptr<multiplexer::elem>;
} // detail
} // boost::redis
#endif // BOOST_REDIS_MULTIPLEXER_HPP

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -9,18 +9,12 @@
namespace boost::redis {
connection::connection(
executor_type ex,
asio::ssl::context ctx,
std::size_t max_read_size)
: impl_{ex, std::move(ctx), max_read_size}
connection::connection(executor_type ex, asio::ssl::context ctx)
: impl_{ex, std::move(ctx)}
{ }
connection::connection(
asio::io_context& ioc,
asio::ssl::context ctx,
std::size_t max_read_size)
: impl_{ioc.get_executor(), std::move(ctx), max_read_size}
connection::connection(asio::io_context& ioc, asio::ssl::context ctx)
: impl_{ioc.get_executor(), std::move(ctx)}
{ }
void

View File

@@ -74,7 +74,7 @@ void logger::on_ssl_handshake(system::error_code const& ec)
void
logger::on_write(
system::error_code const& ec,
std::string const& payload)
std::string_view payload)
{
if (level_ < level::info)
return;

View File

@@ -0,0 +1,321 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/request.hpp>
#include <memory>
namespace boost::redis::detail
{
multiplexer::elem::elem(request const& req, pipeline_adapter_type adapter)
: req_{&req}
, adapter_{}
, remaining_responses_{req.get_expected_responses()}
, status_{status::waiting}
, ec_{{}}
, read_size_{0}
{
adapter_ = [this, adapter](resp3::node_view const& nd, system::error_code& ec)
{
auto const i = req_->get_expected_responses() - remaining_responses_;
adapter(i, nd, ec);
};
}
auto multiplexer::elem::notify_error(system::error_code ec) noexcept -> void
{
if (!ec_) {
ec_ = ec;
}
done_();
}
auto multiplexer::elem::commit_response(std::size_t read_size) -> void
{
read_size_ += read_size;
--remaining_responses_;
}
bool multiplexer::remove(std::shared_ptr<elem> const& ptr)
{
if (ptr->is_waiting()) {
reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), ptr));
return true;
}
return false;
}
std::size_t multiplexer::commit_write()
{
// We have to clear the payload right after writing it to use it
// as a flag that informs there is no ongoing write.
write_buffer_.clear();
// There is small optimization possible here: traverse only the
// partition of unwritten requests instead of them all.
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
if (ptr->is_staged()) {
ptr->mark_written();
}
});
return release_push_requests();
}
void multiplexer::add(std::shared_ptr<elem> const& info)
{
reqs_.push_back(info);
if (info->get_request().has_hello_priority()) {
auto rend =
std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
return e->is_waiting();
});
std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
}
}
std::pair<tribool, std::size_t>
multiplexer::commit_read(system::error_code& ec)
{
// We arrive here in two states:
//
// 1. While we are parsing a message. In this case we
// don't want to determine the type of the message in the
// buffer (i.e. response vs push) but leave it untouched
// until the parsing of a complete message ends.
//
// 2. On a new message, in which case we have to determine
// whether the next messag is a push or a response.
//
if (!on_push_) // Prepare for new message.
on_push_ = is_next_push();
if (on_push_) {
if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec))
return std::make_pair(std::nullopt, 0);
if (ec)
return std::make_pair(std::make_optional(true), 0);
auto const size = on_finish_parsing(true);
return std::make_pair(std::make_optional(true), size);
}
BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
BOOST_ASSERT(!reqs_.empty());
BOOST_ASSERT(reqs_.front() != nullptr);
BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0);
if (!resp3::parse(parser_, read_buffer_, reqs_.front()->get_adapter(), ec))
return std::make_pair(std::nullopt, 0);
if (ec) {
reqs_.front()->notify_error(ec);
return std::make_pair(std::make_optional(false), 0);
}
reqs_.front()->commit_response(parser_.get_consumed());
if (reqs_.front()->get_remaining_responses() == 0) {
// Done with this request.
reqs_.front()->notify_done();
reqs_.pop_front();
}
auto const size = on_finish_parsing(false);
return std::make_pair(std::make_optional(false), size);
}
void multiplexer::reset()
{
write_buffer_.clear();
read_buffer_.clear();
parser_.reset();
on_push_ = false;
cancel_run_called_ = false;
}
std::size_t multiplexer::prepare_write()
{
// Coalesces the requests and marks them staged. After a
// successful write staged requests will be marked as written.
auto const point =
std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
return !ri->is_waiting();
});
std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
// Stage the request.
write_buffer_ += ri->get_request().payload();
ri->mark_staged();
usage_.commands_sent += ri->get_request().get_commands();
});
usage_.bytes_sent += std::size(write_buffer_);
auto const d = std::distance(point, std::cend(reqs_));
return static_cast<std::size_t>(d);
}
std::size_t multiplexer::cancel_waiting()
{
auto f = [](auto const& ptr)
{
BOOST_ASSERT(ptr != nullptr);
return !ptr->is_waiting();
};
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
auto const ret = std::distance(point, std::end(reqs_));
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->notify_error({asio::error::operation_aborted});
});
reqs_.erase(point, std::end(reqs_));
return ret;
}
auto multiplexer::cancel_on_conn_lost() -> std::size_t
{
// Protects the code below from being called more than
// once, see https://github.com/boostorg/redis/issues/181
if (std::exchange(cancel_run_called_, true)) {
return 0;
}
// Must return false if the request should be removed.
auto cond = [](auto const& ptr)
{
BOOST_ASSERT(ptr != nullptr);
if (ptr->is_waiting()) {
return !ptr->get_request().get_config().cancel_on_connection_lost;
} else {
return !ptr->get_request().get_config().cancel_if_unresponded;
}
};
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
auto const ret = std::distance(point, std::end(reqs_));
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->notify_error({asio::error::operation_aborted});
});
reqs_.erase(point, std::end(reqs_));
std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return ptr->mark_waiting();
});
return ret;
}
std::size_t multiplexer::on_finish_parsing(bool is_push)
{
if (is_push) {
usage_.pushes_received += 1;
usage_.push_bytes_received += parser_.get_consumed();
} else {
usage_.responses_received += 1;
usage_.response_bytes_received += parser_.get_consumed();
}
on_push_ = false;
read_buffer_.erase(0, parser_.get_consumed());
auto const size = parser_.get_consumed();
parser_.reset();
return size;
}
bool multiplexer::is_next_push() const noexcept
{
BOOST_ASSERT(!read_buffer_.empty());
// Useful links to understand the heuristics below.
//
// - https://github.com/redis/redis/issues/11784
// - https://github.com/redis/redis/issues/6426
// - https://github.com/boostorg/redis/issues/170
// The message's resp3 type is a push.
if (resp3::to_type(read_buffer_.front()) == resp3::type::push)
return true;
// This is non-push type and the requests queue is empty. I have
// noticed this is possible, for example with -MISCONF. I don't
// know why they are not sent with a push type so we can
// distinguish them from responses to commands. If we are lucky
// enough to receive them when the command queue is empty they
// can be treated as server pushes, otherwise it is impossible
// to handle them properly
if (reqs_.empty())
return true;
// The request does not expect any response but we got one. This
// may happen if for example, subscribe with wrong syntax.
if (reqs_.front()->get_remaining_responses() == 0)
return true;
// Added to deal with MONITOR and also to fix PR170 which
// happens under load and on low-latency networks, where we
// might start receiving responses before the write operation
// completed and the request is still maked as staged and not
// written.
return reqs_.front()->is_waiting();
}
std::size_t multiplexer::release_push_requests()
{
auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
return !(ptr->is_written() && ptr->get_request().get_expected_responses() == 0);
});
std::for_each(point, std::end(reqs_), [](auto const& ptr) {
ptr->notify_done();
});
auto const d = std::distance(point, std::end(reqs_));
reqs_.erase(point, std::end(reqs_));
return static_cast<std::size_t>(d);
}
bool multiplexer::is_waiting_response() const noexcept
{
if (std::empty(reqs_))
return false;
// Under load and on low-latency networks we might start
// receiving responses before the write operation completed and
// the request is still maked as staged and not written. See
// https://github.com/boostorg/redis/issues/170
return !reqs_.front()->is_waiting();
}
bool multiplexer::is_writing() const noexcept
{
return !write_buffer_.empty();
}
auto
make_elem(
request const& req,
multiplexer::pipeline_adapter_type adapter) -> std::shared_ptr<multiplexer::elem>
{
return std::make_shared<multiplexer::elem>(req, std::move(adapter));
}
}

View File

@@ -97,7 +97,7 @@ public:
* @param ec Error code returned by the write operation.
* @param payload The payload written to the socket.
*/
void on_write(system::error_code const& ec, std::string const& payload);
void on_write(system::error_code const& ec, std::string_view payload);
/** @brief Called when the read operation completes.
* @ingroup high-level-api

View File

@@ -1,4 +1,4 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
@@ -11,6 +11,7 @@
#include <boost/redis/impl/connection.ipp>
#include <boost/redis/impl/response.ipp>
#include <boost/redis/impl/resp3_handshaker.ipp>
#include <boost/redis/impl/multiplexer.ipp>
#include <boost/redis/resp3/impl/type.ipp>
#include <boost/redis/resp3/impl/parser.ipp>
#include <boost/redis/resp3/impl/serialization.ipp>

View File

@@ -52,6 +52,7 @@ boost::redis::config make_test_config()
{
boost::redis::config cfg;
cfg.addr.host = get_server_hostname();
cfg.max_read_size = 1000000;
return cfg;
}

View File

@@ -36,7 +36,7 @@ BOOST_AUTO_TEST_CASE(issue_181)
auto const level = boost::redis::logger::level::debug;
net::io_context ioc;
auto ctx = net::ssl::context{net::ssl::context::tlsv12_client};
basic_connection conn{ioc.get_executor(), std::move(ctx), 1000000};
basic_connection conn{ioc.get_executor(), std::move(ctx)};
net::steady_timer timer{ioc};
timer.expires_after(std::chrono::seconds{1});

View File

@@ -5,8 +5,12 @@
*/
#include <boost/redis/detail/resp3_handshaker.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#define BOOST_TEST_MODULE conn-quit
#include <boost/test/included/unit_test.hpp>
#include <string>
@@ -20,6 +24,11 @@ using boost::redis::adapter::adapt2;
using boost::redis::adapter::result;
using boost::redis::resp3::detail::deserialize;
using boost::redis::ignore_t;
using boost::redis::detail::multiplexer;
using boost::redis::generic_response;
using boost::redis::resp3::node;
using boost::redis::resp3::to_string;
using boost::redis::any_adapter;
BOOST_AUTO_TEST_CASE(low_level_sync_sans_io)
{
@@ -235,3 +244,164 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null)
exit(EXIT_FAILURE);
}
}
//===========================================================================
// Multiplexer
std::ostream& operator<<(std::ostream& os, node const& nd)
{
os << to_string(nd.data_type) << "\n"
<< nd.aggregate_size << "\n"
<< nd.depth << "\n"
<< nd.value;
return os;
}
BOOST_AUTO_TEST_CASE(multiplexer_push)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n";
boost::system::error_code ec;
auto const ret = mpx.commit_read(ec);
BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16);
// TODO: Provide operator << for generic_response so we can compare
// the whole vector.
BOOST_CHECK_EQUAL(resp.value().size(), 3);
BOOST_CHECK_EQUAL(resp.value().at(1).value, "one");
BOOST_CHECK_EQUAL(resp.value().at(2).value, "two");
for (auto const& e: resp.value())
std::cout << e << std::endl;
}
BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_response(resp);
// Only part of the message.
mpx.get_read_buffer() = ">2\r\n+one\r";
boost::system::error_code ec;
auto ret = mpx.commit_read(ec);
BOOST_TEST(!ret.first.has_value());
mpx.get_read_buffer().append("\n+two\r\n");
ret = mpx.commit_read(ec);
BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16);
// TODO: Provide operator << for generic_response so we can compare
// the whole vector.
BOOST_CHECK_EQUAL(resp.value().size(), 3);
BOOST_CHECK_EQUAL(resp.value().at(1).value, "one");
BOOST_CHECK_EQUAL(resp.value().at(2).value, "two");
}
struct test_item {
request req;
generic_response resp;
std::shared_ptr<multiplexer::elem> elem_ptr;
bool done = false;
test_item(bool cmd_with_response = true)
{
// The exact command is irrelevant because it is not being sent
// to Redis.
req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter(resp).impl_.adapt_fn);
elem_ptr->set_done_callback([this]() {
done = true;
});
}
};
BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
{
test_item item1{};
test_item item2{false};
test_item item3{};
// Add some requests to the multiplexer.
multiplexer mpx;
mpx.add(item1.elem_ptr);
mpx.add(item3.elem_ptr);
mpx.add(item2.elem_ptr);
// These requests haven't been written yet so their statuses should
// be "waiting.".
BOOST_TEST(item1.elem_ptr->is_waiting());
BOOST_TEST(item2.elem_ptr->is_waiting());
BOOST_TEST(item3.elem_ptr->is_waiting());
// There are three requests to coalesce, a second call should do
// nothing.
BOOST_CHECK_EQUAL(mpx.prepare_write(), 3);
BOOST_CHECK_EQUAL(mpx.prepare_write(), 0);
// After coalescing the requests for writing their statuses should
// be changed to "staged".
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
BOOST_TEST(item3.elem_ptr->is_staged());
// There are no waiting requests to cancel since they are all
// staged.
BOOST_CHECK_EQUAL(mpx.cancel_waiting(), 0);
// Since the requests haven't been sent (written) the done
// callback should not have been called yet.
BOOST_TEST(!item1.done);
BOOST_TEST(!item2.done);
BOOST_TEST(!item3.done);
// The commit_write call informs the multiplexer the payload was
// sent (e.g. written to the socket). This step releases requests
// that has no response.
BOOST_CHECK_EQUAL(mpx.commit_write(), 1);
// The staged status should now have changed to written.
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(item2.elem_ptr->is_written());
BOOST_TEST(item3.elem_ptr->is_written());
// The done status should still be unchanged on requests that
// expect a response.
BOOST_TEST(!item1.done);
BOOST_TEST( item2.done);
BOOST_TEST(!item3.done);
// Simulates a socket read by putting some data in the read buffer.
mpx.get_read_buffer().append("+one\r\n");
// Informs the multiplexer the read operation is concluded.
boost::system::error_code ec;
auto const ret = mpx.commit_read(ec);
// The read operation should have been successfull.
BOOST_TEST(ret.first.has_value());
BOOST_TEST(ret.second != 0);
// The read buffer should also be empty now
BOOST_TEST(mpx.get_read_buffer().empty());
// The last request still did not get a response.
BOOST_TEST( item1.done);
BOOST_TEST( item2.done);
BOOST_TEST(!item3.done);
// TODO: Check the first request was removed from the queue.
}