diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index 1c495759..1e24d310 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -17,11 +17,9 @@ #include -#include #include #include #include -#include #include #include @@ -31,7 +29,13 @@ class request; namespace detail { -using tribool = std::optional; +// Return type of the multiplexer::consume_next function +enum class consume_result +{ + needs_more, // consume_next didn't have enough data + got_response, // got a response to a regular command, vs. a push + got_push, // got a response to a push +}; class multiplexer { public: @@ -121,11 +125,9 @@ public: // they don't have a response e.g. SUBSCRIBE. auto commit_write() -> std::size_t; - // If the tribool contains no value more data is needed, otherwise - // if the value is true the message consumed is a push. [[nodiscard]] auto consume_next(std::string_view data, system::error_code& ec) - -> std::pair; + -> std::pair; auto add(std::shared_ptr const& ptr) -> void; auto reset() -> void; @@ -166,9 +168,6 @@ public: auto is_writing() const noexcept -> bool; private: - [[nodiscard]] - auto is_waiting_response() const noexcept -> bool; - void commit_usage(bool is_push, std::size_t size); [[nodiscard]] @@ -179,7 +178,7 @@ private: auto release_push_requests() -> std::size_t; [[nodiscard]] - tribool consume_next_impl(std::string_view data, system::error_code& ec); + consume_result consume_next_impl(std::string_view data, system::error_code& ec); std::string write_buffer_; std::deque> reqs_; diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index f217dbb9..6892ebc0 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -48,7 +48,7 @@ private: action action_after_resume_; action::type next_read_type_ = action::type::append_some; multiplexer* mpx_ = nullptr; - std::pair res_{std::make_pair(std::nullopt, 0)}; + std::pair res_{consume_result::needs_more, 0u}; }; } // namespace boost::redis::detail diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index 12f900e3..1838d3c2 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -7,6 +7,8 @@ #include #include +#include + #include namespace boost::redis::detail { @@ -76,7 +78,7 @@ void multiplexer::add(std::shared_ptr const& info) } } -tribool multiplexer::consume_next_impl(std::string_view data, system::error_code& ec) +consume_result multiplexer::consume_next_impl(std::string_view data, system::error_code& ec) { // We arrive here in two states: // @@ -86,7 +88,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code // until the parsing of a complete message ends. // // 2. On a new message, in which case we have to determine - // whether the next messag is a push or a response. + // whether the next message is a push or a response. // BOOST_ASSERT(!data.empty()); @@ -95,25 +97,23 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code if (on_push_) { if (!resp3::parse(parser_, data, receive_adapter_, ec)) - return std::nullopt; + return consume_result::needs_more; - return std::make_optional(true); + return consume_result::got_push; } - BOOST_ASSERT_MSG( - is_waiting_response(), - "Not waiting for a response (using MONITOR command perhaps?)"); BOOST_ASSERT(!reqs_.empty()); BOOST_ASSERT(reqs_.front() != nullptr); BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0); + BOOST_ASSERT(!reqs_.front()->is_waiting()); if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec)) - return std::nullopt; + return consume_result::needs_more; if (ec) { reqs_.front()->notify_error(ec); reqs_.pop_front(); - return std::make_optional(false); + return consume_result::got_response; } reqs_.front()->commit_response(parser_.get_consumed()); @@ -123,10 +123,10 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code reqs_.pop_front(); } - return std::make_optional(false); + return consume_result::got_response; } -std::pair multiplexer::consume_next( +std::pair multiplexer::consume_next( std::string_view data, system::error_code& ec) { @@ -136,13 +136,13 @@ std::pair multiplexer::consume_next( return std::make_pair(ret, consumed); } - if (ret.has_value()) { + if (ret != consume_result::needs_more) { parser_.reset(); - commit_usage(ret.value(), consumed); + commit_usage(ret == consume_result::got_push, consumed); return std::make_pair(ret, consumed); } - return std::make_pair(std::nullopt, consumed); + return std::make_pair(consume_result::needs_more, consumed); } void multiplexer::reset() @@ -275,7 +275,7 @@ bool multiplexer::is_next_push(std::string_view data) const noexcept // Added to deal with MONITOR and also to fix PR170 which // happens under load and on low-latency networks, where we // might start receiving responses before the write operation - // completed and the request is still maked as staged and not + // completed and the request is still marked as staged and not // written. return reqs_.front()->is_waiting(); } @@ -295,18 +295,6 @@ std::size_t multiplexer::release_push_requests() return static_cast(d); } -bool multiplexer::is_waiting_response() const noexcept -{ - if (std::empty(reqs_)) - return false; - - // Under load and on low-latency networks we might start - // receiving responses before the write operation completed and - // the request is still maked as staged and not written. See - // https://github.com/boostorg/redis/issues/170 - return !reqs_.front()->is_waiting(); -} - bool multiplexer::is_writing() const noexcept { return !write_buffer_.empty(); } void multiplexer::set_receive_adapter(any_adapter adapter) diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index a9f87965..272a5859 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -55,14 +55,14 @@ reader_fsm::action reader_fsm::resume( return action_after_resume_; } - if (!res_.first.has_value()) { + if (res_.first == consume_result::needs_more) { next_read_type_ = action::type::needs_more; break; } read_buffer_->consume_committed(res_.second); - if (res_.first.value()) { + if (res_.first == consume_result::got_push) { BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second) if (ec) { action_after_resume_ = {action::type::done, 0u, ec}; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d6e04feb..bb5e1fd9 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -42,6 +42,7 @@ make_test(test_log_to_file) make_test(test_conn_logging) make_test(test_reader_fsm) make_test(test_setup_request_utils) +make_test(test_multiplexer) # Tests that require a real Redis server make_test(test_conn_quit) diff --git a/test/Jamfile b/test/Jamfile index b3f24452..20be13b7 100644 --- a/test/Jamfile +++ b/test/Jamfile @@ -58,6 +58,7 @@ local tests = test_conn_logging test_reader_fsm test_setup_request_utils + test_multiplexer ; # Build and run the tests diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index 1d73864b..faaa5997 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -15,11 +15,8 @@ #include #include -#include "common.hpp" - #include #include -#include #include #include @@ -28,6 +25,7 @@ namespace asio = boost::asio; using detail::exec_fsm; using detail::multiplexer; using detail::exec_action_type; +using detail::consume_result; using detail::exec_action; using boost::system::error_code; using boost::asio::cancellation_type_t; @@ -61,6 +59,16 @@ std::ostream& operator<<(std::ostream& os, exec_action act) return os << " }"; } +std::ostream& operator<<(std::ostream& os, consume_result v) +{ + switch (v) { + case consume_result::needs_more: return os << "consume_result::needs_more"; + case consume_result::got_response: return os << "consume_result::got_response"; + case consume_result::got_push: return os << "consume_result::got_push"; + default: return os << ""; + } +} + } // namespace boost::redis::detail // Prints a message on failure. Useful for parameterized tests @@ -119,8 +127,8 @@ void test_success() // Simulate a successful read auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push - BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed + BOOST_TEST_EQ(req_status.first, consume_result::got_response); + BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed BOOST_TEST_EQ(input.done_calls, 1u); // This will awaken the exec operation, and should complete the operation @@ -218,8 +226,8 @@ void test_not_connected() // Simulate a successful read auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ(ec, error_code()); - BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push - BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed + BOOST_TEST_EQ(req_status.first, consume_result::got_response); + BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed BOOST_TEST_EQ(input.done_calls, 1u); // This will awaken the exec operation, and should complete the operation @@ -341,7 +349,7 @@ void test_cancel_notwaiting_notterminal() // Simulate a successful read auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ_MSG(ec, error_code(), tc.name); - BOOST_TEST_EQ_MSG(req_status.first.value(), false, tc.name); // it wasn't a push + BOOST_TEST_EQ_MSG(req_status.first, consume_result::got_response, tc.name); BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed BOOST_TEST_EQ_MSG(input.done_calls, 1u, tc.name); diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 4e3ff88b..98dd75d4 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include #include @@ -23,7 +22,6 @@ using boost::redis::request; using boost::redis::adapter::adapt2; using boost::redis::adapter::result; -using boost::redis::detail::multiplexer; using boost::redis::generic_response; using boost::redis::ignore_t; using boost::redis::resp3::detail::deserialize; @@ -191,160 +189,6 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null) } } -//=========================================================================== -// Multiplexer - -std::ostream& operator<<(std::ostream& os, node const& nd) -{ - os << to_string(nd.data_type) << "\n" - << nd.aggregate_size << "\n" - << nd.depth << "\n" - << nd.value; - - return os; -} - -BOOST_AUTO_TEST_CASE(multiplexer_push) -{ - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - - boost::system::error_code ec; - auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec); - - BOOST_TEST(ret.first.value()); - BOOST_CHECK_EQUAL(ret.second, 16u); - - // TODO: Provide operator << for generic_response so we can compare - // the whole vector. - BOOST_CHECK_EQUAL(resp.value().size(), 3u); - BOOST_CHECK_EQUAL(resp.value().at(1).value, "one"); - BOOST_CHECK_EQUAL(resp.value().at(2).value, "two"); - - for (auto const& e : resp.value()) - std::cout << e << std::endl; -} - -BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) -{ - multiplexer mpx; - generic_response resp; - mpx.set_receive_adapter(any_adapter{resp}); - - std::string msg; - // Only part of the message. - msg += ">2\r\n+one\r"; - - boost::system::error_code ec; - auto ret = mpx.consume_next(msg, ec); - - BOOST_TEST(!ret.first.has_value()); - - msg += "\n+two\r\n"; - ret = mpx.consume_next(msg, ec); - - BOOST_TEST(ret.first.value()); - BOOST_CHECK_EQUAL(ret.second, 16u); - - // TODO: Provide operator << for generic_response so we can compare - // the whole vector. - BOOST_CHECK_EQUAL(resp.value().size(), 3u); - BOOST_CHECK_EQUAL(resp.value().at(1).value, "one"); - BOOST_CHECK_EQUAL(resp.value().at(2).value, "two"); -} - -struct test_item { - request req; - generic_response resp; - std::shared_ptr elem_ptr; - bool done = false; - - test_item(bool cmd_with_response = true) - { - // The exact command is irrelevant because it is not being sent - // to Redis. - req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg"); - - elem_ptr = std::make_shared(req, any_adapter{resp}); - - elem_ptr->set_done_callback([this]() { - done = true; - }); - } -}; - -BOOST_AUTO_TEST_CASE(multiplexer_pipeline) -{ - test_item item1{}; - test_item item2{false}; - test_item item3{}; - - // Add some requests to the multiplexer. - multiplexer mpx; - mpx.add(item1.elem_ptr); - mpx.add(item3.elem_ptr); - mpx.add(item2.elem_ptr); - - // These requests haven't been written yet so their statuses should - // be "waiting.". - BOOST_TEST(item1.elem_ptr->is_waiting()); - BOOST_TEST(item2.elem_ptr->is_waiting()); - BOOST_TEST(item3.elem_ptr->is_waiting()); - - // There are three requests to coalesce, a second call should do - // nothing. - BOOST_CHECK_EQUAL(mpx.prepare_write(), 3u); - BOOST_CHECK_EQUAL(mpx.prepare_write(), 0u); - - // After coalescing the requests for writing their statuses should - // be changed to "staged". - BOOST_TEST(item1.elem_ptr->is_staged()); - BOOST_TEST(item2.elem_ptr->is_staged()); - BOOST_TEST(item3.elem_ptr->is_staged()); - - // There are no waiting requests to cancel since they are all - // staged. - BOOST_CHECK_EQUAL(mpx.cancel_waiting(), 0u); - - // Since the requests haven't been sent (written) the done - // callback should not have been called yet. - BOOST_TEST(!item1.done); - BOOST_TEST(!item2.done); - BOOST_TEST(!item3.done); - - // The commit_write call informs the multiplexer the payload was - // sent (e.g. written to the socket). This step releases requests - // that has no response. - BOOST_CHECK_EQUAL(mpx.commit_write(), 1u); - - // The staged status should now have changed to written. - BOOST_TEST(item1.elem_ptr->is_written()); - BOOST_TEST(item2.elem_ptr->is_done()); - BOOST_TEST(item3.elem_ptr->is_written()); - - // The done status should still be unchanged on requests that - // expect a response. - BOOST_TEST(!item1.done); - BOOST_TEST(item2.done); - BOOST_TEST(!item3.done); - - // Consumes the next message in the read buffer. - boost::system::error_code ec; - auto const ret = mpx.consume_next("+one\r\n", ec); - - // The read operation should have been successfull. - BOOST_TEST(ret.first.has_value()); - BOOST_TEST(ret.second != 0u); - - // The last request still did not get a response. - BOOST_TEST(item1.done); - BOOST_TEST(item2.done); - BOOST_TEST(!item3.done); - - // TODO: Check the first request was removed from the queue. -} - BOOST_AUTO_TEST_CASE(read_buffer_prepare_error) { using boost::redis::detail::read_buffer; diff --git a/test/test_multiplexer.cpp b/test/test_multiplexer.cpp new file mode 100644 index 00000000..91a07436 --- /dev/null +++ b/test/test_multiplexer.cpp @@ -0,0 +1,571 @@ +/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +using boost::redis::request; +using boost::redis::detail::multiplexer; +using boost::redis::detail::consume_result; +using boost::redis::generic_response; +using boost::redis::resp3::node; +using boost::redis::resp3::to_string; +using boost::redis::resp3::type; +using boost::redis::response; +using boost::redis::any_adapter; +using boost::system::error_code; + +namespace boost::redis::resp3 { + +std::ostream& operator<<(std::ostream& os, node const& nd) +{ + return os << "node{ .data_type=" << to_string(nd.data_type) + << ", .aggregate_size=" << nd.aggregate_size << ", .depth=" << nd.depth + << ", .value=" << nd.value << "}"; +} + +} // namespace boost::redis::resp3 + +namespace boost::redis::detail { + +std::ostream& operator<<(std::ostream& os, consume_result v) +{ + switch (v) { + case consume_result::needs_more: return os << "consume_result::needs_more"; + case consume_result::got_response: return os << "consume_result::got_response"; + case consume_result::got_push: return os << "consume_result::got_push"; + default: return os << ""; + } +} + +} // namespace boost::redis::detail + +namespace { + +struct test_item { + request req; + generic_response resp; + std::shared_ptr elem_ptr; + bool done = false; + + test_item(bool cmd_with_response = true) + { + // The exact command is irrelevant because it is not being sent + // to Redis. + req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg"); + + elem_ptr = std::make_shared(req, any_adapter{resp}); + + elem_ptr->set_done_callback([this]() { + done = true; + }); + } +}; + +void test_request_needs_more() +{ + // Setup + test_item item1{}; + multiplexer mpx; + + // Add the element to the multiplexer and simulate a successful write + mpx.add(item1.elem_ptr); + BOOST_TEST_EQ(mpx.prepare_write(), 1u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + BOOST_TEST(item1.elem_ptr->is_written()); + BOOST_TEST(!item1.done); + + // Parse part of the response + error_code ec; + auto ret = mpx.consume_next("$11\r\nhello", ec); + BOOST_TEST_EQ(ret.first, consume_result::needs_more); + + // Parse the rest of it + ret = mpx.consume_next("$11\r\nhello world\r\n", ec); + BOOST_TEST_EQ(ret.first, consume_result::got_response); + BOOST_TEST(item1.resp.has_value()); + + const node expected[] = { + {type::blob_string, 1u, 0u, "hello world"}, + }; + BOOST_TEST_ALL_EQ( + item1.resp->begin(), + item1.resp->end(), + std::begin(expected), + std::end(expected)); +} + +void test_several_requests() +{ + test_item item1{}; + test_item item2{false}; + test_item item3{}; + + // Add some requests to the multiplexer. + multiplexer mpx; + mpx.add(item1.elem_ptr); + mpx.add(item3.elem_ptr); + mpx.add(item2.elem_ptr); + + // These requests haven't been written yet so their statuses should + // be "waiting.". + BOOST_TEST(item1.elem_ptr->is_waiting()); + BOOST_TEST(item2.elem_ptr->is_waiting()); + BOOST_TEST(item3.elem_ptr->is_waiting()); + + // There are three requests to coalesce, a second call should do + // nothing. + BOOST_TEST_EQ(mpx.prepare_write(), 3u); + BOOST_TEST_EQ(mpx.prepare_write(), 0u); + + // After coalescing the requests for writing their statuses should + // be changed to "staged". + BOOST_TEST(item1.elem_ptr->is_staged()); + BOOST_TEST(item2.elem_ptr->is_staged()); + BOOST_TEST(item3.elem_ptr->is_staged()); + + // There are no waiting requests to cancel since they are all + // staged. + BOOST_TEST_EQ(mpx.cancel_waiting(), 0u); + + // Since the requests haven't been sent (written) the done + // callback should not have been called yet. + BOOST_TEST(!item1.done); + BOOST_TEST(!item2.done); + BOOST_TEST(!item3.done); + + // The commit_write call informs the multiplexer the payload was + // sent (e.g. written to the socket). This step releases requests + // that has no response. + BOOST_TEST_EQ(mpx.commit_write(), 1u); + + // The staged status should now have changed to written. + BOOST_TEST(item1.elem_ptr->is_written()); + BOOST_TEST(item2.elem_ptr->is_done()); + BOOST_TEST(item3.elem_ptr->is_written()); + + // The done status should still be unchanged on requests that + // expect a response. + BOOST_TEST(!item1.done); + BOOST_TEST(item2.done); + BOOST_TEST(!item3.done); + + // Consumes the next message in the read buffer. + error_code ec; + auto ret = mpx.consume_next("+one\r\n", ec); + + // The read operation should have been successful. + BOOST_TEST_EQ(ret.first, consume_result::got_response); + BOOST_TEST(ret.second != 0u); + + // The last request still did not get a response. + BOOST_TEST(item1.done); + BOOST_TEST(item2.done); + BOOST_TEST(!item3.done); + + // Consumes the second message in the read buffer + // Consumes the next message in the read buffer. + ret = mpx.consume_next("+two\r\n", ec); + + // The read operation should have been successful. + BOOST_TEST_EQ(ret.first, consume_result::got_response); + BOOST_TEST(ret.second != 0u); + + // Everything done + BOOST_TEST(item1.done); + BOOST_TEST(item2.done); + BOOST_TEST(item3.done); +} + +// The response to a request is received before its write +// confirmation. This might happen on heavy load +void test_request_response_before_write() +{ + // Setup + auto item = std::make_unique(); + multiplexer mpx; + + // Add the request and trigger the write + mpx.add(item->elem_ptr); + BOOST_TEST(item->elem_ptr->is_waiting()); + BOOST_TEST_EQ(mpx.prepare_write(), 1u); + BOOST_TEST(item->elem_ptr->is_staged()); + BOOST_TEST(!item->done); + + // The response is received. The request is marked as done, + // even if the write hasn't been confirmed yet + error_code ec; + auto ret = mpx.consume_next("+one\r\n", ec); + + BOOST_TEST_EQ(ret.first, consume_result::got_response); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST(item->done); + + // The request is removed + item.reset(); + + // The write gets confirmed and causes no problem + BOOST_TEST_EQ(mpx.commit_write(), 0u); +} + +void test_push() +{ + // Setup + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + + // Consume an entire push + error_code ec; + auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec); + + // Check + BOOST_TEST_EQ(ret.first, consume_result::got_push); + BOOST_TEST_EQ(ret.second, 16u); + BOOST_TEST(resp.has_value()); + + const node expected[] = { + {type::push, 2u, 0u, "" }, + {type::simple_string, 1u, 1u, "one"}, + {type::simple_string, 1u, 1u, "two"}, + }; + BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); +} + +void test_push_needs_more() +{ + // Setup + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + std::string msg; + + // Only part of the message available. + msg += ">2\r\n+one\r"; + + // Consume it + error_code ec; + auto ret = mpx.consume_next(msg, ec); + + BOOST_TEST_EQ(ret.first, consume_result::needs_more); + + // The entire message becomes available + msg += "\n+two\r\n"; + ret = mpx.consume_next(msg, ec); + + BOOST_TEST_EQ(ret.first, consume_result::got_push); + BOOST_TEST_EQ(ret.second, 16u); + BOOST_TEST(resp.has_value()); + + const node expected[] = { + {type::push, 2u, 0u, "" }, + {type::simple_string, 1u, 1u, "one"}, + {type::simple_string, 1u, 1u, "two"}, + }; + BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); +} + +// If a response is received and no request is waiting, it is interpreted +// as a push (e.g. MONITOR) +void test_push_heuristics_no_request() +{ + // Setup + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + + // Response received, but no request has been sent + error_code ec; + auto const ret = mpx.consume_next("+Hello world\r\n", ec); + + // Check + BOOST_TEST_EQ(ret.first, consume_result::got_push); + BOOST_TEST_EQ(ret.second, 14u); + BOOST_TEST(resp.has_value()); + + const node expected[] = { + {type::simple_string, 1u, 0u, "Hello world"}, + }; + BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); +} + +// Same, but there's another request that hasn't been written yet. +// This is an edge case but might happen due to race conditions. +void test_push_heuristics_request_waiting() +{ + // Setup + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + test_item item; + + // Add the item but don't write it (e.g. the writer task is busy) + mpx.add(item.elem_ptr); + + // Response received, but no request has been sent + error_code ec; + auto const ret = mpx.consume_next("+Hello world\r\n", ec); + + // Check + BOOST_TEST_EQ(ret.first, consume_result::got_push); + BOOST_TEST_EQ(ret.second, 14u); + BOOST_TEST(resp.has_value()); + + const node expected[] = { + {type::simple_string, 1u, 0u, "Hello world"}, + }; + BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); +} + +// If a response is received and the first request doesn't expect a response, +// it is interpreted as a push (e.g. SUBSCRIBE with incorrect syntax) +void test_push_heuristics_request_without_response() +{ + // Setup + multiplexer mpx; + generic_response resp; + mpx.set_receive_adapter(any_adapter{resp}); + test_item item{false}; + + // Add the request to the multiplexer + mpx.add(item.elem_ptr); + + // Write it, but don't confirm the write, so the request doesn't get removed + BOOST_TEST_EQ(mpx.prepare_write(), 1u); + + // Response received (e.g. syntax error) + error_code ec; + auto const ret = mpx.consume_next("-ERR wrong syntax\r\n", ec); + + // Check + BOOST_TEST_EQ(ret.first, consume_result::got_push); + BOOST_TEST_EQ(ret.second, 19u); + BOOST_TEST_EQ(resp.error().diagnostic, "ERR wrong syntax"); +} + +// We correctly reset parsing state between requests and pushes. +void test_mix_responses_pushes() +{ + // Setup + multiplexer mpx; + generic_response push_resp; + mpx.set_receive_adapter(any_adapter{push_resp}); + test_item item1, item2; + + // Add the elements to the multiplexer and simulate a successful write + mpx.add(item1.elem_ptr); + mpx.add(item2.elem_ptr); + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + BOOST_TEST(item1.elem_ptr->is_written()); + BOOST_TEST(!item1.done); + BOOST_TEST(item2.elem_ptr->is_written()); + BOOST_TEST(!item2.done); + + // Push + std::string_view push1_buffer = ">2\r\n+one\r\n+two\r\n"; + error_code ec; + auto ret = mpx.consume_next(push1_buffer, ec); + BOOST_TEST_EQ(ret.first, consume_result::got_push); + BOOST_TEST_EQ(ret.second, 16u); + BOOST_TEST(push_resp.has_value()); + std::vector expected{ + {type::push, 2u, 0u, "" }, + {type::simple_string, 1u, 1u, "one"}, + {type::simple_string, 1u, 1u, "two"}, + }; + BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end()); + BOOST_TEST_NOT(item1.done); + BOOST_TEST_NOT(item2.done); + + // First response + std::string_view response1_buffer = "$11\r\nHello world\r\n"; + ret = mpx.consume_next(response1_buffer, ec); + BOOST_TEST_EQ(ret.first, consume_result::got_response); + BOOST_TEST_EQ(ret.second, 18u); + BOOST_TEST(item1.resp.has_value()); + expected = { + {type::blob_string, 1u, 0u, "Hello world"}, + }; + BOOST_TEST_ALL_EQ(item1.resp->begin(), item1.resp->end(), expected.begin(), expected.end()); + BOOST_TEST(item1.done); + BOOST_TEST_NOT(item2.done); + + // Push + std::string_view push2_buffer = ">2\r\n+other\r\n+push\r\n"; + ret = mpx.consume_next(push2_buffer, ec); + BOOST_TEST_EQ(ret.first, consume_result::got_push); + BOOST_TEST_EQ(ret.second, 19u); + BOOST_TEST(push_resp.has_value()); + expected = { + {type::push, 2u, 0u, "" }, + {type::simple_string, 1u, 1u, "one" }, + {type::simple_string, 1u, 1u, "two" }, + {type::push, 2u, 0u, "" }, + {type::simple_string, 1u, 1u, "other"}, + {type::simple_string, 1u, 1u, "push" }, + }; + BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end()); + BOOST_TEST(item1.done); + BOOST_TEST_NOT(item2.done); + + // Second response + std::string_view response2_buffer = "$8\r\nResponse\r\n"; + ret = mpx.consume_next(response2_buffer, ec); + BOOST_TEST_EQ(ret.first, consume_result::got_response); + BOOST_TEST_EQ(ret.second, 14u); + BOOST_TEST(item2.resp.has_value()); + expected = { + {type::blob_string, 1u, 0u, "Response"}, + }; + BOOST_TEST_ALL_EQ(item2.resp->begin(), item2.resp->end(), expected.begin(), expected.end()); + BOOST_TEST(item1.done); + BOOST_TEST(item2.done); + + // Check usage + const auto usg = mpx.get_usage(); + BOOST_TEST_EQ(usg.commands_sent, 2u); + BOOST_TEST_EQ(usg.bytes_sent, item1.req.payload().size() + item2.req.payload().size()); + BOOST_TEST_EQ(usg.responses_received, 2u); + BOOST_TEST_EQ(usg.pushes_received, 2u); + BOOST_TEST_EQ(usg.response_bytes_received, response1_buffer.size() + response2_buffer.size()); + BOOST_TEST_EQ(usg.push_bytes_received, push1_buffer.size() + push2_buffer.size()); +} + +// Cancellation on connection lost +void test_cancel_on_connection_lost() +{ + // Setup + multiplexer mpx; + test_item item_written1, item_written2, item_staged1, item_staged2, item_waiting1, item_waiting2; + + // Different items have different configurations + // (note that these are all true by default) + item_written1.req.get_config().cancel_if_unresponded = false; + item_staged1.req.get_config().cancel_if_unresponded = false; + item_waiting1.req.get_config().cancel_on_connection_lost = false; + + // Make each item reach the state it should be in + mpx.add(item_written1.elem_ptr); + mpx.add(item_written2.elem_ptr); + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + + mpx.add(item_staged1.elem_ptr); + mpx.add(item_staged2.elem_ptr); + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + + mpx.add(item_waiting1.elem_ptr); + mpx.add(item_waiting2.elem_ptr); + + // Check that we got it right + BOOST_TEST(item_written1.elem_ptr->is_written()); + BOOST_TEST(item_written2.elem_ptr->is_written()); + BOOST_TEST(item_staged1.elem_ptr->is_staged()); + BOOST_TEST(item_staged2.elem_ptr->is_staged()); + BOOST_TEST(item_waiting1.elem_ptr->is_waiting()); + BOOST_TEST(item_waiting2.elem_ptr->is_waiting()); + + // Trigger a connection lost event + mpx.cancel_on_conn_lost(); + + // The ones with the cancellation settings set to false are back to waiting. + // Others are cancelled + BOOST_TEST(!item_written1.done); + BOOST_TEST(item_written1.elem_ptr->is_waiting()); + BOOST_TEST(item_written2.done); + BOOST_TEST(!item_staged1.done); + BOOST_TEST(item_staged1.elem_ptr->is_waiting()); + BOOST_TEST(item_staged2.done); + BOOST_TEST(!item_waiting1.done); + BOOST_TEST(item_waiting1.elem_ptr->is_waiting()); + BOOST_TEST(item_waiting2.done); + + // Triggering it again does nothing + mpx.cancel_on_conn_lost(); + BOOST_TEST(!item_written1.done); + BOOST_TEST(item_written1.elem_ptr->is_waiting()); +} + +// The test below fails. Uncomment when this is fixed: +// https://github.com/boostorg/redis/issues/307 +// void test_cancel_on_connection_lost_half_parsed_response() +// { +// // Setup +// multiplexer mpx; +// test_item item; +// item.req.get_config().cancel_if_unresponded = false; +// error_code ec; + +// // Add the request, write it and start parsing the response +// mpx.add(item.elem_ptr); +// BOOST_TEST_EQ(mpx.prepare_write(), 1u); +// BOOST_TEST_EQ(mpx.commit_write(), 0u); +// auto res = mpx.consume_next("*2\r\n+hello\r\n", ec); +// BOOST_TEST_EQ(res.first, consume_result::needs_more); +// BOOST_TEST_EQ(ec, error_code()); + +// // Trigger a connection lost event +// mpx.cancel_on_conn_lost(); +// BOOST_TEST(!item.done); +// BOOST_TEST(item.elem_ptr->is_waiting()); + +// // Simulate a reconnection +// mpx.reset(); + +// // Successful write, and this time the response is complete +// BOOST_TEST_EQ(mpx.prepare_write(), 1u); +// BOOST_TEST_EQ(mpx.commit_write(), 0u); +// res = mpx.consume_next("*2\r\n+hello\r\n+world\r\n", ec); +// BOOST_TEST_EQ(res.first, consume_result::got_response); +// BOOST_TEST_EQ(ec, error_code()); + +// // Check the response +// BOOST_TEST(item.resp.has_value()); +// const node expected[] = { +// {type::array, 2u, 0u, "" }, +// {type::simple_string, 1u, 1u, "hello"}, +// {type::simple_string, 1u, 1u, "world"}, +// }; +// BOOST_TEST_ALL_EQ( +// item.resp->begin(), +// item.resp->end(), +// std::begin(expected), +// std::end(expected)); +// } + +} // namespace + +int main() +{ + test_request_needs_more(); + test_several_requests(); + test_request_response_before_write(); + test_push(); + test_push_needs_more(); + test_push_heuristics_no_request(); + test_push_heuristics_request_without_response(); + test_push_heuristics_request_waiting(); + test_mix_responses_pushes(); + test_cancel_on_connection_lost(); + // test_cancel_on_connection_lost_half_parsed_response(); + + return boost::report_errors(); +}