diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index c7e5ecca..844be948 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -253,79 +253,82 @@ struct reader_op { Conn* conn_; Logger logger_; - std::pair res_{std::make_pair(std::make_optional(false), 0)}; + std::pair res_{std::make_pair(std::nullopt, 0)}; asio::coroutine coro{}; template void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) { - ignore_unused(n); - BOOST_ASIO_CORO_REENTER(coro) for (;;) { // Appends some data to the buffer if necessary. - if (!res_.first.has_value() || conn_->mpx_.is_data_needed()) { - if (conn_->use_ssl()) { - BOOST_ASIO_CORO_YIELD - async_append_some( - conn_->next_layer(), - dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, - conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), - std::move(self)); - } else { - BOOST_ASIO_CORO_YIELD - async_append_some( - conn_->next_layer().next_layer(), - dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, - conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), - std::move(self)); - } - - logger_.on_read(ec, n); - - // The connection is not viable after an error. - if (ec) { - logger_.trace("reader_op (1)", ec); - conn_->cancel(operation::run); - self.complete(ec); - return; - } - - // Somebody might have canceled implicitly or explicitly - // while we were suspended and after queueing so we have to - // check. - if (!conn_->is_open()) { - logger_.trace("reader_op (2): connection is closed."); - self.complete(ec); - return; - } + if (conn_->use_ssl()) { + BOOST_ASIO_CORO_YIELD + async_append_some( + conn_->next_layer(), + dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, + conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), + std::move(self)); + } else { + BOOST_ASIO_CORO_YIELD + async_append_some( + conn_->next_layer().next_layer(), + dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, + conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), + std::move(self)); } - res_ = conn_->mpx_.commit_read(ec); + logger_.on_read(ec, n); + + // The connection is not viable after an error. if (ec) { - logger_.trace("reader_op (3)", ec); + logger_.trace("reader_op (1)", ec); conn_->cancel(operation::run); self.complete(ec); return; } - if (res_.first.has_value() && res_.first.value()) { - if (!conn_->receive_channel_.try_send(ec, res_.second)) { - BOOST_ASIO_CORO_YIELD - conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); - } + // The connection might have been canceled while this op was + // suspended or after queueing so we have to check. + if (!conn_->is_open()) { + logger_.trace("reader_op (2): connection is closed."); + self.complete(ec); + return; + } + + while (!conn_->mpx_.get_read_buffer().empty()) { + res_ = conn_->mpx_.consume_next(ec); if (ec) { - logger_.trace("reader_op (4)", ec); + logger_.trace("reader_op (3)", ec); conn_->cancel(operation::run); self.complete(ec); return; } - if (!conn_->is_open()) { - logger_.trace("reader_op (5): connection is closed."); - self.complete(asio::error::operation_aborted); - return; + if (!res_.first.has_value()) { + // More data is needed. + break; + } + + if (res_.first.value()) { + if (!conn_->receive_channel_.try_send(ec, res_.second)) { + BOOST_ASIO_CORO_YIELD + conn_->receive_channel_.async_send(ec, res_.second, std::move(self)); + } + + if (ec) { + logger_.trace("reader_op (4)", ec); + conn_->cancel(operation::run); + self.complete(ec); + return; + } + + if (!conn_->is_open()) { + logger_.trace("reader_op (5): connection is closed."); + self.complete(asio::error::operation_aborted); + return; + } } } } diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index 96ebe4fd..5818db2d 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -113,8 +113,10 @@ struct multiplexer { // they don't have a response e.g. SUBSCRIBE. auto commit_write() -> std::size_t; + // If the tribool contains no value more data is needed, otherwise + // if the value is true the message consumed is a push. [[nodiscard]] - auto commit_read(system::error_code& ec) -> std::pair; + auto consume_next(system::error_code& ec) -> std::pair; auto add(std::shared_ptr const& ptr) -> void; auto reset() -> void; @@ -150,9 +152,9 @@ struct multiplexer { } [[nodiscard]] - auto is_data_needed() const noexcept -> bool + auto get_read_buffer() const noexcept -> std::string const& { - return std::empty(read_buffer_); + return read_buffer_; } // TODO: Change signature to receive an adapter instead of a diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index 945222dd..5dae6c08 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -81,7 +81,7 @@ void multiplexer::add(std::shared_ptr const& info) } } -std::pair multiplexer::commit_read(system::error_code& ec) +std::pair multiplexer::consume_next(system::error_code& ec) { // We arrive here in two states: // diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 70d42c5e..768fbfd2 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -261,7 +261,7 @@ BOOST_AUTO_TEST_CASE(multiplexer_push) mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n"; boost::system::error_code ec; - auto const ret = mpx.commit_read(ec); + auto const ret = mpx.consume_next(ec); BOOST_TEST(ret.first.value()); BOOST_CHECK_EQUAL(ret.second, 16u); @@ -286,12 +286,12 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) mpx.get_read_buffer() = ">2\r\n+one\r"; boost::system::error_code ec; - auto ret = mpx.commit_read(ec); + auto ret = mpx.consume_next(ec); BOOST_TEST(!ret.first.has_value()); mpx.get_read_buffer().append("\n+two\r\n"); - ret = mpx.commit_read(ec); + ret = mpx.consume_next(ec); BOOST_TEST(ret.first.value()); BOOST_CHECK_EQUAL(ret.second, 16u); @@ -381,9 +381,9 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline) // Simulates a socket read by putting some data in the read buffer. mpx.get_read_buffer().append("+one\r\n"); - // Informs the multiplexer the read operation is concluded. + // Consumes the next message in the read buffer. boost::system::error_code ec; - auto const ret = mpx.commit_read(ec); + auto const ret = mpx.consume_next(ec); // The read operation should have been successfull. BOOST_TEST(ret.first.has_value());