From 109248d53b0015befa84ca6ef5f6994b34d1cc52 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 28 Dec 2024 21:33:17 +0100 Subject: [PATCH 1/2] Removes unnecessary dynamic buffer. --- include/boost/redis/config.hpp | 8 +++++ include/boost/redis/connection.hpp | 44 ++++++++----------------- include/boost/redis/impl/connection.ipp | 14 +++----- test/common.cpp | 1 + test/test_issue_181.cpp | 2 +- 5 files changed, 28 insertions(+), 41 deletions(-) diff --git a/include/boost/redis/config.hpp b/include/boost/redis/config.hpp index 297918c4..24b21ce3 100644 --- a/include/boost/redis/config.hpp +++ b/include/boost/redis/config.hpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace boost::redis { @@ -78,6 +79,13 @@ struct config { * To disable reconnection pass zero as duration. */ std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1}; + + /** @brief Maximum size of a socket read. + * + * Sets a limit on how much data is allowed to be read into the + * read buffer. It can be used to prevent DDOS. + */ + std::size_t max_read_size = (std::numeric_limits::max)(); }; } // boost::redis diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 203ce390..d43a00b1 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -50,7 +50,6 @@ #include #include #include -#include #include #include #include @@ -59,13 +58,6 @@ namespace boost::redis { namespace detail { -template -std::string_view buffer_view(DynamicBuffer buf) noexcept -{ - char const* start = static_cast(buf.data(0, buf.size()).data()); - return std::string_view{start, std::size(buf)}; -} - template class append_some_op { private: @@ -250,6 +242,8 @@ template struct reader_op { using parse_result = typename Conn::parse_result; using parse_ret_type = typename Conn::parse_ret_type; + using dyn_buffer_type = asio::dynamic_string_buffer, std::allocator>; + Conn* conn_; Logger logger_; parse_ret_type res_{parse_result::resp, 0}; @@ -270,14 +264,14 @@ struct reader_op { BOOST_ASIO_CORO_YIELD async_append_some( conn_->next_layer(), - conn_->dbuf_, + dyn_buffer_type{conn_->read_buffer_, conn_->cfg_.max_read_size}, conn_->get_suggested_buffer_growth(), std::move(self)); } else { BOOST_ASIO_CORO_YIELD async_append_some( conn_->next_layer().next_layer(), - conn_->dbuf_, + dyn_buffer_type{conn_->read_buffer_, conn_->cfg_.max_read_size}, conn_->get_suggested_buffer_growth(), std::move(self)); } @@ -302,7 +296,7 @@ struct reader_op { } } - res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec); + res_ = conn_->on_read(ec); if (ec) { logger_.trace("reader_op (3)", ec); conn_->cancel(operation::run); @@ -501,21 +495,17 @@ public: * * @param ex Executor on which connection operation will run. * @param ctx SSL context. - * @param max_read_size Maximum read size that is passed to - * the internal `asio::dynamic_buffer` constructor. */ explicit basic_connection( executor_type ex, - asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, - std::size_t max_read_size = (std::numeric_limits::max)()) + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}) : ctx_{std::move(ctx)} , stream_{std::make_unique(ex, ctx_)} , writer_timer_{ex} , receive_channel_{ex, 256} , resv_{ex} , health_checker_{ex} - , dbuf_{read_buffer_, max_read_size} { set_receive_response(ignore); writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)()); @@ -525,9 +515,8 @@ public: explicit basic_connection( asio::io_context& ioc, - asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, - std::size_t max_read_size = (std::numeric_limits::max)()) - : basic_connection(ioc.get_executor(), std::move(ctx), max_read_size) + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}) + : basic_connection(ioc.get_executor(), std::move(ctx)) { } /** @brief Starts underlying connection operations. @@ -1126,13 +1115,13 @@ private: } on_push_ = false; - dbuf_.consume(parser_.get_consumed()); + read_buffer_.erase(0, parser_.get_consumed()); auto const res = std::make_pair(t, parser_.get_consumed()); parser_.reset(); return res; } - parse_ret_type on_read(std::string_view data, system::error_code& ec) + parse_ret_type on_read(system::error_code& ec) { // We arrive here in two states: // @@ -1148,7 +1137,7 @@ private: on_push_ = is_next_push(); if (on_push_) { - if (!resp3::parse(parser_, data, receive_adapter_, ec)) + if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec)) return std::make_pair(parse_result::needs_more, 0); if (ec) @@ -1162,7 +1151,7 @@ private: BOOST_ASSERT(reqs_.front() != nullptr); BOOST_ASSERT(reqs_.front()->expected_responses_ != 0); - if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec)) + if (!resp3::parse(parser_, read_buffer_, reqs_.front()->adapter_, ec)) return std::make_pair(parse_result::needs_more, 0); if (ec) { @@ -1205,11 +1194,8 @@ private: resp3_handshaker_type handshaker_; receiver_adapter_type receive_adapter_; - using dyn_buffer_type = asio::dynamic_string_buffer, std::allocator>; - config cfg_; std::string read_buffer_; - dyn_buffer_type dbuf_; std::string write_buffer_; reqs_type reqs_; resp3::parser parser_{}; @@ -1237,15 +1223,13 @@ public: explicit connection( executor_type ex, - asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, - std::size_t max_read_size = (std::numeric_limits::max)()); + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}); /// Contructs from a context. explicit connection( asio::io_context& ioc, - asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}, - std::size_t max_read_size = (std::numeric_limits::max)()); + asio::ssl::context ctx = asio::ssl::context{asio::ssl::context::tlsv12_client}); /// Returns the underlying executor. executor_type get_executor() noexcept diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 76727f69..0bd8188f 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -9,18 +9,12 @@ namespace boost::redis { -connection::connection( - executor_type ex, - asio::ssl::context ctx, - std::size_t max_read_size) -: impl_{ex, std::move(ctx), max_read_size} +connection::connection(executor_type ex, asio::ssl::context ctx) +: impl_{ex, std::move(ctx)} { } -connection::connection( - asio::io_context& ioc, - asio::ssl::context ctx, - std::size_t max_read_size) -: impl_{ioc.get_executor(), std::move(ctx), max_read_size} +connection::connection(asio::io_context& ioc, asio::ssl::context ctx) +: impl_{ioc.get_executor(), std::move(ctx)} { } void diff --git a/test/common.cpp b/test/common.cpp index ba39b184..8f56d207 100644 --- a/test/common.cpp +++ b/test/common.cpp @@ -52,6 +52,7 @@ boost::redis::config make_test_config() { boost::redis::config cfg; cfg.addr.host = get_server_hostname(); + cfg.max_read_size = 1000000; return cfg; } diff --git a/test/test_issue_181.cpp b/test/test_issue_181.cpp index 73871c88..5885573c 100644 --- a/test/test_issue_181.cpp +++ b/test/test_issue_181.cpp @@ -36,7 +36,7 @@ BOOST_AUTO_TEST_CASE(issue_181) auto const level = boost::redis::logger::level::debug; net::io_context ioc; auto ctx = net::ssl::context{net::ssl::context::tlsv12_client}; - basic_connection conn{ioc.get_executor(), std::move(ctx), 1000000}; + basic_connection conn{ioc.get_executor(), std::move(ctx)}; net::steady_timer timer{ioc}; timer.expires_after(std::chrono::seconds{1}); From f9d0679be54a4f65dfd2056eac656e2409c51ac2 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Sat, 3 May 2025 13:41:12 +0200 Subject: [PATCH 2/2] Splits the multiplexer out of the connection. --- include/boost/redis/adapter/any_adapter.hpp | 15 +- include/boost/redis/connection.hpp | 477 +++----------------- include/boost/redis/detail/multiplexer.hpp | 200 ++++++++ include/boost/redis/impl/connection.ipp | 2 +- include/boost/redis/impl/logger.ipp | 2 +- include/boost/redis/impl/multiplexer.ipp | 321 +++++++++++++ include/boost/redis/logger.hpp | 2 +- include/boost/redis/src.hpp | 3 +- test/test_low_level_sync_sans_io.cpp | 170 +++++++ 9 files changed, 766 insertions(+), 426 deletions(-) create mode 100644 include/boost/redis/detail/multiplexer.hpp create mode 100644 include/boost/redis/impl/multiplexer.ipp diff --git a/include/boost/redis/adapter/any_adapter.hpp b/include/boost/redis/adapter/any_adapter.hpp index f317fc1b..82b7e70f 100644 --- a/include/boost/redis/adapter/any_adapter.hpp +++ b/include/boost/redis/adapter/any_adapter.hpp @@ -18,14 +18,6 @@ namespace boost::redis { -namespace detail { - -// Forward decl -template -class basic_connection; - -} - /** @brief A type-erased reference to a response. * @ingroup high-level-api * @@ -39,9 +31,9 @@ class basic_connection; * co_await conn.async_exec(req, any_response(resp)); * ``` */ -class any_adapter -{ - using fn_type = std::function const&, system::error_code&)>; +class any_adapter { +public: + using fn_type = std::function; struct impl_t { fn_type adapt_fn; @@ -60,7 +52,6 @@ class any_adapter template friend class basic_connection; -public: /** * @brief Constructor. * diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index d43a00b1..ec7def78 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -44,15 +45,12 @@ #include -#include #include #include #include #include #include #include -#include -#include namespace boost::redis { namespace detail @@ -111,23 +109,28 @@ async_append_some( >(append_some_op {stream, buffer, size}, token, stream); } +template +using exec_notifier_type = + asio::experimental::channel; + template struct exec_op { - using req_info_type = typename Conn::req_info; - using adapter_type = typename Conn::adapter_type; + using req_info_type = typename multiplexer::elem; + using executor_type = typename Conn::executor_type; Conn* conn_ = nullptr; + std::shared_ptr> notifier_ = nullptr; std::shared_ptr info_ = nullptr; - asio::coroutine coro{}; + asio::coroutine coro_{}; template void operator()(Self& self , system::error_code = {}, std::size_t = 0) { - BOOST_ASIO_CORO_REENTER (coro) + BOOST_ASIO_CORO_REENTER (coro_) { // Check whether the user wants to wait for the connection to // be stablished. - if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) { + if (info_->get_request().get_config().cancel_if_not_connected && !conn_->is_open()) { BOOST_ASIO_CORO_YIELD asio::dispatch( asio::get_associated_immediate_executor(self, self.get_io_executor()), @@ -135,25 +138,22 @@ struct exec_op { return self.complete(error::not_connected, 0); } - conn_->add_request_info(info_); + conn_->mpx_.add(info_); + if (conn_->trigger_write()) { + conn_->writer_timer_.cancel(); + } EXEC_OP_WAIT: BOOST_ASIO_CORO_YIELD - info_->async_wait(std::move(self)); + notifier_->async_receive(std::move(self)); - if (info_->ec_) { - self.complete(info_->ec_, 0); + if (info_->get_error()) { + self.complete(info_->get_error(), 0); return; } - if (info_->stop_requested()) { - // Don't have to call remove_request as it has already - // been by cancel(exec). - return self.complete(asio::error::operation_aborted, 0); - } - if (is_cancelled(self)) { - if (!info_->is_waiting()) { + if (!conn_->mpx_.remove(info_)) { using c_t = asio::cancellation_type; auto const c = self.get_cancellation_state().cancelled(); if ((c & c_t::terminal) != c_t::none) { @@ -170,14 +170,13 @@ EXEC_OP_WAIT: goto EXEC_OP_WAIT; } } else { - // Cancelation can be honored. - conn_->remove_request(info_); + // Cancelation honored. self.complete(asio::error::operation_aborted, 0); return; } } - self.complete(info_->ec_, info_->read_size_); + self.complete(info_->get_error(), info_->get_read_size()); } } }; @@ -197,13 +196,22 @@ struct writer_op { BOOST_ASIO_CORO_REENTER (coro) for (;;) { - while (conn_->coalesce_requests()) { - if (conn_->use_ssl()) - BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self)); - else - BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self)); + while (conn_->mpx_.prepare_write() != 0) { + if (conn_->use_ssl()) { + BOOST_ASIO_CORO_YIELD + asio::async_write( + conn_->next_layer(), + asio::buffer(conn_->mpx_.get_write_buffer()), + std::move(self)); + } else { + BOOST_ASIO_CORO_YIELD + asio::async_write( + conn_->next_layer().next_layer(), + asio::buffer(conn_->mpx_.get_write_buffer()), + std::move(self)); + } - logger_.on_write(ec, conn_->write_buffer_); + logger_.on_write(ec, conn_->mpx_.get_write_buffer()); if (ec) { logger_.trace("writer_op (1)", ec); @@ -212,7 +220,7 @@ struct writer_op { return; } - conn_->on_write(); + conn_->mpx_.commit_write(); // A socket.close() may have been called while a // successful write might had already been queued, so we @@ -240,13 +248,14 @@ struct writer_op { template struct reader_op { - using parse_result = typename Conn::parse_result; - using parse_ret_type = typename Conn::parse_ret_type; using dyn_buffer_type = asio::dynamic_string_buffer, std::allocator>; + // TODO: Move this to config so the user can fine tune? + static constexpr std::size_t buffer_growth_hint = 4096; + Conn* conn_; Logger logger_; - parse_ret_type res_{parse_result::resp, 0}; + std::pair res_{std::make_pair(std::make_optional(false), 0)}; asio::coroutine coro{}; template @@ -259,20 +268,20 @@ struct reader_op { BOOST_ASIO_CORO_REENTER (coro) for (;;) { // Appends some data to the buffer if necessary. - if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) { + if (!res_.first.has_value() || conn_->mpx_.is_data_needed()) { if (conn_->use_ssl()) { BOOST_ASIO_CORO_YIELD async_append_some( conn_->next_layer(), - dyn_buffer_type{conn_->read_buffer_, conn_->cfg_.max_read_size}, - conn_->get_suggested_buffer_growth(), + dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, + conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), std::move(self)); } else { BOOST_ASIO_CORO_YIELD async_append_some( conn_->next_layer().next_layer(), - dyn_buffer_type{conn_->read_buffer_, conn_->cfg_.max_read_size}, - conn_->get_suggested_buffer_growth(), + dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, + conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), std::move(self)); } @@ -296,7 +305,7 @@ struct reader_op { } } - res_ = conn_->on_read(ec); + res_ = conn_->mpx_.commit_read(ec); if (ec) { logger_.trace("reader_op (3)", ec); conn_->cancel(operation::run); @@ -304,7 +313,7 @@ struct reader_op { return; } - if (res_.first == parse_result::push) { + if (res_.first.has_value() && res_.first.value()) { if (!conn_->receive_channel_.try_send(ec, res_.second)) { BOOST_ASIO_CORO_YIELD conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); @@ -398,7 +407,7 @@ public: } } - conn_->reset(); + conn_->mpx_.reset(); // Note: Order is important here because the writer might // trigger an async_write before the async_hello thereby @@ -686,12 +695,17 @@ public: auto& adapter_impl = adapter.impl_; BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes."); - auto info = std::make_shared(req, std::move(adapter_impl.adapt_fn), get_executor()); + auto notifier = std::make_shared>(get_executor(), 1); + auto info = detail::make_elem(req, std::move(adapter_impl.adapt_fn)); + + info->set_done_callback([notifier]() { + notifier->try_send(std::error_code{}, 0); + }); return asio::async_compose < CompletionToken , void(system::error_code, std::size_t) - >(detail::exec_op{this, info}, token, writer_timer_); + >(detail::exec_op{this, notifier, info}, token, writer_timer_); } /** @brief Cancel operations. @@ -712,7 +726,7 @@ public: resv_.cancel(); break; case operation::exec: - cancel_unwritten_requests(); + mpx_.cancel_waiting(); break; case operation::reconnection: cfg_.reconnect_wait_interval = std::chrono::seconds::zero(); @@ -732,14 +746,14 @@ public: health_checker_.cancel(); cancel_run(); // run receive_channel_.cancel(); // receive - cancel_unwritten_requests(); // exec + mpx_.cancel_waiting(); // exec break; default: /* ignore */; } } auto run_is_canceled() const noexcept - { return cancel_run_called_; } + { return mpx_.get_cancel_run_state(); } /// Returns true if the connection was canceled. bool will_reconnect() const noexcept @@ -760,18 +774,15 @@ public: /// Returns a const reference to the next layer. auto const& next_layer() const noexcept { return *stream_; } + /// Sets the response object of `async_receive` operations. template void set_receive_response(Response& response) - { - using namespace boost::redis::adapter; - auto g = boost_redis_adapt(response); - receive_adapter_ = adapter::detail::make_adapter_wrapper(g); - } + { mpx_.set_receive_response(response); } /// Returns connection usage information. usage get_usage() const noexcept - { return usage_; } + { return mpx_.get_usage(); } private: using clock_type = std::chrono::steady_clock; @@ -782,220 +793,23 @@ private: using resolver_type = detail::resolver; using health_checker_type = detail::health_checker; using resp3_handshaker_type = detail::resp3_handshaker; - using adapter_type = std::function const&, system::error_code&)>; - using receiver_adapter_type = std::function const&, system::error_code&)>; - using exec_notifier_type = receive_channel_type; auto use_ssl() const noexcept { return cfg_.use_ssl;} - auto cancel_on_conn_lost() -> std::size_t - { - // Must return false if the request should be removed. - auto cond = [](auto const& ptr) - { - BOOST_ASSERT(ptr != nullptr); - - if (ptr->is_waiting()) { - return !ptr->req_->get_config().cancel_on_connection_lost; - } else { - return !ptr->req_->get_config().cancel_if_unresponded; - } - }; - - auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond); - - auto const ret = std::distance(point, std::end(reqs_)); - - std::for_each(point, std::end(reqs_), [](auto const& ptr) { - ptr->stop(); - }); - - reqs_.erase(point, std::end(reqs_)); - - std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return ptr->mark_waiting(); - }); - - return ret; - } - - auto cancel_unwritten_requests() -> std::size_t - { - auto f = [](auto const& ptr) - { - BOOST_ASSERT(ptr != nullptr); - return !ptr->is_waiting(); - }; - - auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f); - - auto const ret = std::distance(point, std::end(reqs_)); - - std::for_each(point, std::end(reqs_), [](auto const& ptr) { - ptr->stop(); - }); - - reqs_.erase(point, std::end(reqs_)); - return ret; - } - void cancel_run() { - // Protects the code below from being called more than - // once, see https://github.com/boostorg/redis/issues/181 - if (std::exchange(cancel_run_called_, true)) { - return; - } - close(); writer_timer_.cancel(); receive_channel_.cancel(); - cancel_on_conn_lost(); + mpx_.cancel_on_conn_lost(); } - void on_write() - { - // We have to clear the payload right after writing it to use it - // as a flag that informs there is no ongoing write. - write_buffer_.clear(); - - // Notice this must come before the for-each below. - cancel_push_requests(); - - // There is small optimization possible here: traverse only the - // partition of unwritten requests instead of them all. - std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer."); - if (ptr->is_staged()) { - ptr->mark_written(); - } - }); - } - - struct req_info { - public: - using node_type = resp3::basic_node; - using wrapped_adapter_type = std::function; - - explicit req_info(request const& req, adapter_type adapter, executor_type ex) - : notifier_{ex, 1} - , req_{&req} - , adapter_{} - , expected_responses_{req.get_expected_responses()} - , status_{status::waiting} - , ec_{{}} - , read_size_{0} - { - adapter_ = [this, adapter](node_type const& nd, system::error_code& ec) - { - auto const i = req_->get_expected_responses() - expected_responses_; - adapter(i, nd, ec); - }; - } - - auto proceed() - { - notifier_.try_send(std::error_code{}, 0); - } - - void stop() - { - notifier_.close(); - } - - [[nodiscard]] auto is_waiting() const noexcept - { return status_ == status::waiting; } - - [[nodiscard]] auto is_written() const noexcept - { return status_ == status::written; } - - [[nodiscard]] auto is_staged() const noexcept - { return status_ == status::staged; } - - void mark_written() noexcept - { status_ = status::written; } - - void mark_staged() noexcept - { status_ = status::staged; } - - void mark_waiting() noexcept - { status_ = status::waiting; } - - [[nodiscard]] auto stop_requested() const noexcept - { return !notifier_.is_open();} - - template - auto async_wait(CompletionToken&& token) - { - return notifier_.async_receive(std::forward(token)); - } - - //private: - enum class status - { waiting - , staged - , written - }; - - exec_notifier_type notifier_; - request const* req_; - wrapped_adapter_type adapter_; - - // Contains the number of commands that haven't been read yet. - std::size_t expected_responses_; - status status_; - - system::error_code ec_; - std::size_t read_size_; - }; - - void remove_request(std::shared_ptr const& info) - { - reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info)); - } - - using reqs_type = std::deque>; - template friend struct detail::reader_op; template friend struct detail::writer_op; template friend struct detail::exec_op; template friend class detail::run_op; - void cancel_push_requests() - { - auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0); - }); - - std::for_each(point, std::end(reqs_), [](auto const& ptr) { - ptr->proceed(); - }); - - reqs_.erase(point, std::end(reqs_)); - } - - [[nodiscard]] bool is_writing() const noexcept - { - return !write_buffer_.empty(); - } - - void add_request_info(std::shared_ptr const& info) - { - reqs_.push_back(info); - - if (info->req_->has_hello_priority()) { - auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) { - return e->is_waiting(); - }); - - std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); - } - - if (is_open() && !is_writing()) - writer_timer_.cancel(); - } - template auto reader(Logger l, CompletionToken&& token) { @@ -1015,38 +829,6 @@ private: >(detail::writer_op{this, l}, std::forward(token), writer_timer_); } - [[nodiscard]] bool coalesce_requests() - { - // Coalesces the requests and marks them staged. After a - // successful write staged requests will be marked as written. - auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) { - return !ri->is_waiting(); - }); - - std::for_each(point, std::cend(reqs_), [this](auto const& ri) { - // Stage the request. - write_buffer_ += ri->req_->payload(); - ri->mark_staged(); - usage_.commands_sent += ri->expected_responses_; - }); - - usage_.bytes_sent += std::size(write_buffer_); - - return point != std::cend(reqs_); - } - - bool is_waiting_response() const noexcept - { - if (std::empty(reqs_)) - return false; - - // Under load and on low-latency networks we might start - // receiving responses before the write operation completed and - // the request is still maked as staged and not written. See - // https://github.com/boostorg/redis/issues/170 - return !reqs_.front()->is_waiting(); - } - void close() { if (stream_->next_layer().is_open()) { @@ -1058,126 +840,9 @@ private: auto is_open() const noexcept { return stream_->next_layer().is_open(); } auto& lowest_layer() noexcept { return stream_->lowest_layer(); } - auto is_next_push() + [[nodiscard]] bool trigger_write() const noexcept { - BOOST_ASSERT(!read_buffer_.empty()); - - // Useful links to understand the heuristics below. - // - // - https://github.com/redis/redis/issues/11784 - // - https://github.com/redis/redis/issues/6426 - // - https://github.com/boostorg/redis/issues/170 - - // The message's resp3 type is a push. - if (resp3::to_type(read_buffer_.front()) == resp3::type::push) - return true; - - // This is non-push type and the requests queue is empty. I have - // noticed this is possible, for example with -MISCONF. I don't - // know why they are not sent with a push type so we can - // distinguish them from responses to commands. If we are lucky - // enough to receive them when the command queue is empty they - // can be treated as server pushes, otherwise it is impossible - // to handle them properly - if (reqs_.empty()) - return true; - - // The request does not expect any response but we got one. This - // may happen if for example, subscribe with wrong syntax. - if (reqs_.front()->expected_responses_ == 0) - return true; - - // Added to deal with MONITOR and also to fix PR170 which - // happens under load and on low-latency networks, where we - // might start receiving responses before the write operation - // completed and the request is still maked as staged and not - // written. - return reqs_.front()->is_waiting(); - } - - auto get_suggested_buffer_growth() const noexcept - { - return parser_.get_suggested_buffer_growth(4096); - } - - enum class parse_result { needs_more, push, resp }; - - using parse_ret_type = std::pair; - - parse_ret_type on_finish_parsing(parse_result t) - { - if (t == parse_result::push) { - usage_.pushes_received += 1; - usage_.push_bytes_received += parser_.get_consumed(); - } else { - usage_.responses_received += 1; - usage_.response_bytes_received += parser_.get_consumed(); - } - - on_push_ = false; - read_buffer_.erase(0, parser_.get_consumed()); - auto const res = std::make_pair(t, parser_.get_consumed()); - parser_.reset(); - return res; - } - - parse_ret_type on_read(system::error_code& ec) - { - // We arrive here in two states: - // - // 1. While we are parsing a message. In this case we - // don't want to determine the type of the message in the - // buffer (i.e. response vs push) but leave it untouched - // until the parsing of a complete message ends. - // - // 2. On a new message, in which case we have to determine - // whether the next message is a push or a response. - // - if (!on_push_) // Prepare for new message. - on_push_ = is_next_push(); - - if (on_push_) { - if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec)) - return std::make_pair(parse_result::needs_more, 0); - - if (ec) - return std::make_pair(parse_result::push, 0); - - return on_finish_parsing(parse_result::push); - } - - BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)"); - BOOST_ASSERT(!reqs_.empty()); - BOOST_ASSERT(reqs_.front() != nullptr); - BOOST_ASSERT(reqs_.front()->expected_responses_ != 0); - - if (!resp3::parse(parser_, read_buffer_, reqs_.front()->adapter_, ec)) - return std::make_pair(parse_result::needs_more, 0); - - if (ec) { - reqs_.front()->ec_ = ec; - reqs_.front()->proceed(); - return std::make_pair(parse_result::resp, 0); - } - - reqs_.front()->read_size_ += parser_.get_consumed(); - - if (--reqs_.front()->expected_responses_ == 0) { - // Done with this request. - reqs_.front()->proceed(); - reqs_.pop_front(); - } - - return on_finish_parsing(parse_result::resp); - } - - void reset() - { - write_buffer_.clear(); - read_buffer_.clear(); - parser_.reset(); - on_push_ = false; - cancel_run_called_ = false; + return is_open() && !mpx_.is_writing(); } asio::ssl::context ctx_; @@ -1192,17 +857,9 @@ private: detail::connector ctor_; health_checker_type health_checker_; resp3_handshaker_type handshaker_; - receiver_adapter_type receive_adapter_; config cfg_; - std::string read_buffer_; - std::string write_buffer_; - reqs_type reqs_; - resp3::parser parser_{}; - bool on_push_ = false; - bool cancel_run_called_ = false; - - usage usage_; + detail::multiplexer mpx_; }; /** \brief A basic_connection that type erases the executor. diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp new file mode 100644 index 00000000..1c799ed3 --- /dev/null +++ b/include/boost/redis/detail/multiplexer.hpp @@ -0,0 +1,200 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_MULTIPLEXER_HPP +#define BOOST_REDIS_MULTIPLEXER_HPP + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost::redis +{ + +class request; + +namespace detail { + +using tribool = std::optional; + +struct multiplexer { + using adapter_type = std::function; + using pipeline_adapter_type = std::function; + + struct elem { + public: + explicit elem(request const& req, pipeline_adapter_type adapter); + + void set_done_callback(std::function f) noexcept + { done_ = std::move(f); }; + + auto notify_done() noexcept + { done_(); } + + auto notify_error(system::error_code ec) noexcept -> void; + + [[nodiscard]] + auto is_waiting() const noexcept + { return status_ == status::waiting; } + + [[nodiscard]] + auto is_written() const noexcept + { return status_ == status::written; } + + [[nodiscard]] + auto is_staged() const noexcept + { return status_ == status::staged; } + + void mark_written() noexcept + { status_ = status::written; } + + void mark_staged() noexcept + { status_ = status::staged; } + + void mark_waiting() noexcept + { status_ = status::waiting; } + + auto get_error() const -> system::error_code const& + { return ec_; } + + auto get_request() const -> request const& + { return *req_; } + + auto get_read_size() const -> std::size_t + { return read_size_; } + + auto get_remaining_responses() const -> std::size_t + { return remaining_responses_; } + + auto commit_response(std::size_t read_size) -> void; + + auto get_adapter() -> adapter_type& + { return adapter_; } + + private: + enum class status + { waiting + , staged + , written + }; + + request const* req_; + adapter_type adapter_; + + std::function done_; + + // Contains the number of commands that haven't been read yet. + std::size_t remaining_responses_; + status status_; + + system::error_code ec_; + std::size_t read_size_; + }; + + auto remove(std::shared_ptr const& ptr) -> bool; + + [[nodiscard]] + auto prepare_write() -> std::size_t; + + // Returns the number of requests that have been released because + // they don't have a response e.g. SUBSCRIBE. + auto commit_write() -> std::size_t; + + [[nodiscard]] + auto commit_read(system::error_code& ec) -> std::pair; + + auto add(std::shared_ptr const& ptr) -> void; + auto reset() -> void; + + [[nodiscard]] + auto const& get_parser() const noexcept + { return parser_; } + + //[[nodiscard]] + auto cancel_waiting() -> std::size_t; + + //[[nodiscard]] + auto cancel_on_conn_lost() -> std::size_t; + + [[nodiscard]] + auto get_cancel_run_state() const noexcept -> bool + { return cancel_run_called_; } + + [[nodiscard]] + auto get_write_buffer() noexcept -> std::string_view + { return std::string_view{write_buffer_}; } + + [[nodiscard]] + auto get_read_buffer() noexcept -> std::string& + { return read_buffer_; } + + [[nodiscard]] + auto is_data_needed() const noexcept -> bool + { return std::empty(read_buffer_); } + + // TODO: Change signature to receive an adapter instead of a + // response. + template + void set_receive_response(Response& response) + { + using namespace boost::redis::adapter; + auto g = boost_redis_adapt(response); + receive_adapter_ = adapter::detail::make_adapter_wrapper(g); + } + + [[nodiscard]] + auto get_usage() const noexcept -> usage + { return usage_;} + + [[nodiscard]] + auto is_writing() const noexcept -> bool; + +private: + [[nodiscard]] + auto is_waiting_response() const noexcept -> bool; + + [[nodiscard]] + auto on_finish_parsing(bool is_push) -> std::size_t; + + [[nodiscard]] + auto is_next_push() const noexcept -> bool; + + // Releases the number of requests that have been released. + [[nodiscard]] + auto release_push_requests() -> std::size_t; + + std::string read_buffer_; + std::string write_buffer_; + std::deque> reqs_; + resp3::parser parser_{}; + bool on_push_ = false; + bool cancel_run_called_ = false; + usage usage_; + adapter_type receive_adapter_; +}; + +auto +make_elem( + request const& req, + multiplexer::pipeline_adapter_type adapter) -> std::shared_ptr; + +} // detail +} // boost::redis + +#endif // BOOST_REDIS_MULTIPLEXER_HPP diff --git a/include/boost/redis/impl/connection.ipp b/include/boost/redis/impl/connection.ipp index 0bd8188f..a798223d 100644 --- a/include/boost/redis/impl/connection.ipp +++ b/include/boost/redis/impl/connection.ipp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) diff --git a/include/boost/redis/impl/logger.ipp b/include/boost/redis/impl/logger.ipp index 374fe919..b6931cfa 100644 --- a/include/boost/redis/impl/logger.ipp +++ b/include/boost/redis/impl/logger.ipp @@ -74,7 +74,7 @@ void logger::on_ssl_handshake(system::error_code const& ec) void logger::on_write( system::error_code const& ec, - std::string const& payload) + std::string_view payload) { if (level_ < level::info) return; diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp new file mode 100644 index 00000000..2c510ec8 --- /dev/null +++ b/include/boost/redis/impl/multiplexer.ipp @@ -0,0 +1,321 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include + +#include + +namespace boost::redis::detail +{ + +multiplexer::elem::elem(request const& req, pipeline_adapter_type adapter) +: req_{&req} +, adapter_{} +, remaining_responses_{req.get_expected_responses()} +, status_{status::waiting} +, ec_{{}} +, read_size_{0} +{ + adapter_ = [this, adapter](resp3::node_view const& nd, system::error_code& ec) + { + auto const i = req_->get_expected_responses() - remaining_responses_; + adapter(i, nd, ec); + }; +} + +auto multiplexer::elem::notify_error(system::error_code ec) noexcept -> void +{ + if (!ec_) { + ec_ = ec; + } + + done_(); +} + +auto multiplexer::elem::commit_response(std::size_t read_size) -> void +{ + read_size_ += read_size; + --remaining_responses_; +} + +bool multiplexer::remove(std::shared_ptr const& ptr) +{ + if (ptr->is_waiting()) { + reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), ptr)); + return true; + } + + return false; +} + +std::size_t multiplexer::commit_write() +{ + // We have to clear the payload right after writing it to use it + // as a flag that informs there is no ongoing write. + write_buffer_.clear(); + + // There is small optimization possible here: traverse only the + // partition of unwritten requests instead of them all. + std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { + BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer."); + if (ptr->is_staged()) { + ptr->mark_written(); + } + }); + + return release_push_requests(); +} + +void multiplexer::add(std::shared_ptr const& info) +{ + reqs_.push_back(info); + + if (info->get_request().has_hello_priority()) { + auto rend = + std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) { + return e->is_waiting(); + }); + + std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend); + } +} + +std::pair +multiplexer::commit_read(system::error_code& ec) +{ + // We arrive here in two states: + // + // 1. While we are parsing a message. In this case we + // don't want to determine the type of the message in the + // buffer (i.e. response vs push) but leave it untouched + // until the parsing of a complete message ends. + // + // 2. On a new message, in which case we have to determine + // whether the next messag is a push or a response. + // + if (!on_push_) // Prepare for new message. + on_push_ = is_next_push(); + + if (on_push_) { + if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec)) + return std::make_pair(std::nullopt, 0); + + if (ec) + return std::make_pair(std::make_optional(true), 0); + + auto const size = on_finish_parsing(true); + return std::make_pair(std::make_optional(true), size); + } + + BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)"); + BOOST_ASSERT(!reqs_.empty()); + BOOST_ASSERT(reqs_.front() != nullptr); + BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0); + + if (!resp3::parse(parser_, read_buffer_, reqs_.front()->get_adapter(), ec)) + return std::make_pair(std::nullopt, 0); + + if (ec) { + reqs_.front()->notify_error(ec); + return std::make_pair(std::make_optional(false), 0); + } + + reqs_.front()->commit_response(parser_.get_consumed()); + if (reqs_.front()->get_remaining_responses() == 0) { + // Done with this request. + reqs_.front()->notify_done(); + reqs_.pop_front(); + } + + auto const size = on_finish_parsing(false); + return std::make_pair(std::make_optional(false), size); +} + +void multiplexer::reset() +{ + write_buffer_.clear(); + read_buffer_.clear(); + parser_.reset(); + on_push_ = false; + cancel_run_called_ = false; +} + +std::size_t multiplexer::prepare_write() +{ + // Coalesces the requests and marks them staged. After a + // successful write staged requests will be marked as written. + auto const point = + std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) { + return !ri->is_waiting(); + }); + + std::for_each(point, std::cend(reqs_), [this](auto const& ri) { + // Stage the request. + write_buffer_ += ri->get_request().payload(); + ri->mark_staged(); + usage_.commands_sent += ri->get_request().get_commands(); + }); + + usage_.bytes_sent += std::size(write_buffer_); + + auto const d = std::distance(point, std::cend(reqs_)); + return static_cast(d); +} + + +std::size_t multiplexer::cancel_waiting() +{ + auto f = [](auto const& ptr) + { + BOOST_ASSERT(ptr != nullptr); + return !ptr->is_waiting(); + }; + + auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f); + + auto const ret = std::distance(point, std::end(reqs_)); + + std::for_each(point, std::end(reqs_), [](auto const& ptr) { + ptr->notify_error({asio::error::operation_aborted}); + }); + + reqs_.erase(point, std::end(reqs_)); + return ret; +} + +auto multiplexer::cancel_on_conn_lost() -> std::size_t +{ + // Protects the code below from being called more than + // once, see https://github.com/boostorg/redis/issues/181 + if (std::exchange(cancel_run_called_, true)) { + return 0; + } + + // Must return false if the request should be removed. + auto cond = [](auto const& ptr) + { + BOOST_ASSERT(ptr != nullptr); + + if (ptr->is_waiting()) { + return !ptr->get_request().get_config().cancel_on_connection_lost; + } else { + return !ptr->get_request().get_config().cancel_if_unresponded; + } + }; + + auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond); + + auto const ret = std::distance(point, std::end(reqs_)); + + std::for_each(point, std::end(reqs_), [](auto const& ptr) { + ptr->notify_error({asio::error::operation_aborted}); + }); + + reqs_.erase(point, std::end(reqs_)); + + std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { + return ptr->mark_waiting(); + }); + + return ret; +} + +std::size_t multiplexer::on_finish_parsing(bool is_push) +{ + if (is_push) { + usage_.pushes_received += 1; + usage_.push_bytes_received += parser_.get_consumed(); + } else { + usage_.responses_received += 1; + usage_.response_bytes_received += parser_.get_consumed(); + } + + on_push_ = false; + read_buffer_.erase(0, parser_.get_consumed()); + auto const size = parser_.get_consumed(); + parser_.reset(); + return size; +} + +bool multiplexer::is_next_push() const noexcept +{ + BOOST_ASSERT(!read_buffer_.empty()); + + // Useful links to understand the heuristics below. + // + // - https://github.com/redis/redis/issues/11784 + // - https://github.com/redis/redis/issues/6426 + // - https://github.com/boostorg/redis/issues/170 + + // The message's resp3 type is a push. + if (resp3::to_type(read_buffer_.front()) == resp3::type::push) + return true; + + // This is non-push type and the requests queue is empty. I have + // noticed this is possible, for example with -MISCONF. I don't + // know why they are not sent with a push type so we can + // distinguish them from responses to commands. If we are lucky + // enough to receive them when the command queue is empty they + // can be treated as server pushes, otherwise it is impossible + // to handle them properly + if (reqs_.empty()) + return true; + + // The request does not expect any response but we got one. This + // may happen if for example, subscribe with wrong syntax. + if (reqs_.front()->get_remaining_responses() == 0) + return true; + + // Added to deal with MONITOR and also to fix PR170 which + // happens under load and on low-latency networks, where we + // might start receiving responses before the write operation + // completed and the request is still maked as staged and not + // written. + return reqs_.front()->is_waiting(); +} + +std::size_t multiplexer::release_push_requests() +{ + auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { + return !(ptr->is_written() && ptr->get_request().get_expected_responses() == 0); + }); + + std::for_each(point, std::end(reqs_), [](auto const& ptr) { + ptr->notify_done(); + }); + + auto const d = std::distance(point, std::end(reqs_)); + reqs_.erase(point, std::end(reqs_)); + return static_cast(d); +} + +bool multiplexer::is_waiting_response() const noexcept +{ + if (std::empty(reqs_)) + return false; + + // Under load and on low-latency networks we might start + // receiving responses before the write operation completed and + // the request is still maked as staged and not written. See + // https://github.com/boostorg/redis/issues/170 + return !reqs_.front()->is_waiting(); +} + +bool multiplexer::is_writing() const noexcept +{ + return !write_buffer_.empty(); +} + +auto +make_elem( + request const& req, + multiplexer::pipeline_adapter_type adapter) -> std::shared_ptr +{ + return std::make_shared(req, std::move(adapter)); +} + +} diff --git a/include/boost/redis/logger.hpp b/include/boost/redis/logger.hpp index 7558436e..79e55b50 100644 --- a/include/boost/redis/logger.hpp +++ b/include/boost/redis/logger.hpp @@ -97,7 +97,7 @@ public: * @param ec Error code returned by the write operation. * @param payload The payload written to the socket. */ - void on_write(system::error_code const& ec, std::string const& payload); + void on_write(system::error_code const& ec, std::string_view payload); /** @brief Called when the read operation completes. * @ingroup high-level-api diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index 5ba662e0..82455ab4 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 170ff647..a09153ab 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -5,8 +5,12 @@ */ #include +#include #include +#include +#include #include +#include #define BOOST_TEST_MODULE conn-quit #include #include @@ -20,6 +24,11 @@ using boost::redis::adapter::adapt2; using boost::redis::adapter::result; using boost::redis::resp3::detail::deserialize; using boost::redis::ignore_t; +using boost::redis::detail::multiplexer; +using boost::redis::generic_response; +using boost::redis::resp3::node; +using boost::redis::resp3::to_string; +using boost::redis::any_adapter; BOOST_AUTO_TEST_CASE(low_level_sync_sans_io) { @@ -235,3 +244,164 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null) exit(EXIT_FAILURE); } } + +//=========================================================================== +// Multiplexer + +std::ostream& operator<<(std::ostream& os, node const& nd) +{ + os << to_string(nd.data_type) << "\n" + << nd.aggregate_size << "\n" + << nd.depth << "\n" + << nd.value; + + return os; +} + +BOOST_AUTO_TEST_CASE(multiplexer_push) +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_response(resp); + + mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n"; + + boost::system::error_code ec; + auto const ret = mpx.commit_read(ec); + + BOOST_TEST(ret.first.value()); + BOOST_CHECK_EQUAL(ret.second, 16); + + // TODO: Provide operator << for generic_response so we can compare + // the whole vector. + BOOST_CHECK_EQUAL(resp.value().size(), 3); + BOOST_CHECK_EQUAL(resp.value().at(1).value, "one"); + BOOST_CHECK_EQUAL(resp.value().at(2).value, "two"); + + for (auto const& e: resp.value()) + std::cout << e << std::endl; +} + +BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_response(resp); + + // Only part of the message. + mpx.get_read_buffer() = ">2\r\n+one\r"; + + boost::system::error_code ec; + auto ret = mpx.commit_read(ec); + + BOOST_TEST(!ret.first.has_value()); + + mpx.get_read_buffer().append("\n+two\r\n"); + ret = mpx.commit_read(ec); + + BOOST_TEST(ret.first.value()); + BOOST_CHECK_EQUAL(ret.second, 16); + + // TODO: Provide operator << for generic_response so we can compare + // the whole vector. + BOOST_CHECK_EQUAL(resp.value().size(), 3); + BOOST_CHECK_EQUAL(resp.value().at(1).value, "one"); + BOOST_CHECK_EQUAL(resp.value().at(2).value, "two"); +} + +struct test_item { + request req; + generic_response resp; + std::shared_ptr elem_ptr; + bool done = false; + + test_item(bool cmd_with_response = true) + { + // The exact command is irrelevant because it is not being sent + // to Redis. + req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg"); + + elem_ptr = std::make_shared(req, any_adapter(resp).impl_.adapt_fn); + + elem_ptr->set_done_callback([this]() { + done = true; + }); + } +}; + +BOOST_AUTO_TEST_CASE(multiplexer_pipeline) +{ + test_item item1{}; + test_item item2{false}; + test_item item3{}; + + // Add some requests to the multiplexer. + multiplexer mpx; + mpx.add(item1.elem_ptr); + mpx.add(item3.elem_ptr); + mpx.add(item2.elem_ptr); + + // These requests haven't been written yet so their statuses should + // be "waiting.". + BOOST_TEST(item1.elem_ptr->is_waiting()); + BOOST_TEST(item2.elem_ptr->is_waiting()); + BOOST_TEST(item3.elem_ptr->is_waiting()); + + // There are three requests to coalesce, a second call should do + // nothing. + BOOST_CHECK_EQUAL(mpx.prepare_write(), 3); + BOOST_CHECK_EQUAL(mpx.prepare_write(), 0); + + // After coalescing the requests for writing their statuses should + // be changed to "staged". + BOOST_TEST(item1.elem_ptr->is_staged()); + BOOST_TEST(item2.elem_ptr->is_staged()); + BOOST_TEST(item3.elem_ptr->is_staged()); + + // There are no waiting requests to cancel since they are all + // staged. + BOOST_CHECK_EQUAL(mpx.cancel_waiting(), 0); + + // Since the requests haven't been sent (written) the done + // callback should not have been called yet. + BOOST_TEST(!item1.done); + BOOST_TEST(!item2.done); + BOOST_TEST(!item3.done); + + // The commit_write call informs the multiplexer the payload was + // sent (e.g. written to the socket). This step releases requests + // that has no response. + BOOST_CHECK_EQUAL(mpx.commit_write(), 1); + + // The staged status should now have changed to written. + BOOST_TEST(item1.elem_ptr->is_written()); + BOOST_TEST(item2.elem_ptr->is_written()); + BOOST_TEST(item3.elem_ptr->is_written()); + + // The done status should still be unchanged on requests that + // expect a response. + BOOST_TEST(!item1.done); + BOOST_TEST( item2.done); + BOOST_TEST(!item3.done); + + // Simulates a socket read by putting some data in the read buffer. + mpx.get_read_buffer().append("+one\r\n"); + + // Informs the multiplexer the read operation is concluded. + boost::system::error_code ec; + auto const ret = mpx.commit_read(ec); + + // The read operation should have been successfull. + BOOST_TEST(ret.first.has_value()); + BOOST_TEST(ret.second != 0); + + // The read buffer should also be empty now + BOOST_TEST(mpx.get_read_buffer().empty()); + + // The last request still did not get a response. + BOOST_TEST( item1.done); + BOOST_TEST( item2.done); + BOOST_TEST(!item3.done); + + // TODO: Check the first request was removed from the queue. +}