2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Replaces tribool by an enum and adds coverage for multiplexer (#301)

* In the context of the multiplexer, replaces tribool by consume_result to enhance readability and make values smaller
* Splits the multiplexer tests out of test_low_level_sync_sans_io into a separate test file
* Increases testing coverage for the multiplexer class

Entails no functional change.
This commit is contained in:
Anarthal (Rubén Pérez)
2025-09-18 13:55:29 +02:00
committed by GitHub
parent 40417a13b2
commit 8da18379ba
9 changed files with 616 additions and 204 deletions

View File

@@ -17,11 +17,9 @@
#include <boost/system/error_code.hpp>
#include <algorithm>
#include <deque>
#include <functional>
#include <memory>
#include <optional>
#include <string_view>
#include <utility>
@@ -31,7 +29,13 @@ class request;
namespace detail {
using tribool = std::optional<bool>;
// Return type of the multiplexer::consume_next function
enum class consume_result
{
needs_more, // consume_next didn't have enough data
got_response, // got a response to a regular command, vs. a push
got_push, // got a response to a push
};
class multiplexer {
public:
@@ -121,11 +125,9 @@ public:
// they don't have a response e.g. SUBSCRIBE.
auto commit_write() -> std::size_t;
// If the tribool contains no value more data is needed, otherwise
// if the value is true the message consumed is a push.
[[nodiscard]]
auto consume_next(std::string_view data, system::error_code& ec)
-> std::pair<tribool, std::size_t>;
-> std::pair<consume_result, std::size_t>;
auto add(std::shared_ptr<elem> const& ptr) -> void;
auto reset() -> void;
@@ -166,9 +168,6 @@ public:
auto is_writing() const noexcept -> bool;
private:
[[nodiscard]]
auto is_waiting_response() const noexcept -> bool;
void commit_usage(bool is_push, std::size_t size);
[[nodiscard]]
@@ -179,7 +178,7 @@ private:
auto release_push_requests() -> std::size_t;
[[nodiscard]]
tribool consume_next_impl(std::string_view data, system::error_code& ec);
consume_result consume_next_impl(std::string_view data, system::error_code& ec);
std::string write_buffer_;
std::deque<std::shared_ptr<elem>> reqs_;

View File

@@ -48,7 +48,7 @@ private:
action action_after_resume_;
action::type next_read_type_ = action::type::append_some;
multiplexer* mpx_ = nullptr;
std::pair<tribool, std::size_t> res_{std::make_pair(std::nullopt, 0)};
std::pair<consume_result, std::size_t> res_{consume_result::needs_more, 0u};
};
} // namespace boost::redis::detail

View File

@@ -7,6 +7,8 @@
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/request.hpp>
#include <boost/asio/error.hpp>
#include <memory>
namespace boost::redis::detail {
@@ -76,7 +78,7 @@ void multiplexer::add(std::shared_ptr<elem> const& info)
}
}
tribool multiplexer::consume_next_impl(std::string_view data, system::error_code& ec)
consume_result multiplexer::consume_next_impl(std::string_view data, system::error_code& ec)
{
// We arrive here in two states:
//
@@ -86,7 +88,7 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code
// until the parsing of a complete message ends.
//
// 2. On a new message, in which case we have to determine
// whether the next messag is a push or a response.
// whether the next message is a push or a response.
//
BOOST_ASSERT(!data.empty());
@@ -95,25 +97,23 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code
if (on_push_) {
if (!resp3::parse(parser_, data, receive_adapter_, ec))
return std::nullopt;
return consume_result::needs_more;
return std::make_optional(true);
return consume_result::got_push;
}
BOOST_ASSERT_MSG(
is_waiting_response(),
"Not waiting for a response (using MONITOR command perhaps?)");
BOOST_ASSERT(!reqs_.empty());
BOOST_ASSERT(reqs_.front() != nullptr);
BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0);
BOOST_ASSERT(!reqs_.front()->is_waiting());
if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec))
return std::nullopt;
return consume_result::needs_more;
if (ec) {
reqs_.front()->notify_error(ec);
reqs_.pop_front();
return std::make_optional(false);
return consume_result::got_response;
}
reqs_.front()->commit_response(parser_.get_consumed());
@@ -123,10 +123,10 @@ tribool multiplexer::consume_next_impl(std::string_view data, system::error_code
reqs_.pop_front();
}
return std::make_optional(false);
return consume_result::got_response;
}
std::pair<tribool, std::size_t> multiplexer::consume_next(
std::pair<consume_result, std::size_t> multiplexer::consume_next(
std::string_view data,
system::error_code& ec)
{
@@ -136,13 +136,13 @@ std::pair<tribool, std::size_t> multiplexer::consume_next(
return std::make_pair(ret, consumed);
}
if (ret.has_value()) {
if (ret != consume_result::needs_more) {
parser_.reset();
commit_usage(ret.value(), consumed);
commit_usage(ret == consume_result::got_push, consumed);
return std::make_pair(ret, consumed);
}
return std::make_pair(std::nullopt, consumed);
return std::make_pair(consume_result::needs_more, consumed);
}
void multiplexer::reset()
@@ -275,7 +275,7 @@ bool multiplexer::is_next_push(std::string_view data) const noexcept
// Added to deal with MONITOR and also to fix PR170 which
// happens under load and on low-latency networks, where we
// might start receiving responses before the write operation
// completed and the request is still maked as staged and not
// completed and the request is still marked as staged and not
// written.
return reqs_.front()->is_waiting();
}
@@ -295,18 +295,6 @@ std::size_t multiplexer::release_push_requests()
return static_cast<std::size_t>(d);
}
bool multiplexer::is_waiting_response() const noexcept
{
if (std::empty(reqs_))
return false;
// Under load and on low-latency networks we might start
// receiving responses before the write operation completed and
// the request is still maked as staged and not written. See
// https://github.com/boostorg/redis/issues/170
return !reqs_.front()->is_waiting();
}
bool multiplexer::is_writing() const noexcept { return !write_buffer_.empty(); }
void multiplexer::set_receive_adapter(any_adapter adapter)

View File

@@ -55,14 +55,14 @@ reader_fsm::action reader_fsm::resume(
return action_after_resume_;
}
if (!res_.first.has_value()) {
if (res_.first == consume_result::needs_more) {
next_read_type_ = action::type::needs_more;
break;
}
read_buffer_->consume_committed(res_.second);
if (res_.first.value()) {
if (res_.first == consume_result::got_push) {
BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second)
if (ec) {
action_after_resume_ = {action::type::done, 0u, ec};

View File

@@ -42,6 +42,7 @@ make_test(test_log_to_file)
make_test(test_conn_logging)
make_test(test_reader_fsm)
make_test(test_setup_request_utils)
make_test(test_multiplexer)
# Tests that require a real Redis server
make_test(test_conn_quit)

View File

@@ -58,6 +58,7 @@ local tests =
test_conn_logging
test_reader_fsm
test_setup_request_utils
test_multiplexer
;
# Build and run the tests

View File

@@ -15,11 +15,8 @@
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include "common.hpp"
#include <cstddef>
#include <memory>
#include <optional>
#include <ostream>
#include <utility>
@@ -28,6 +25,7 @@ namespace asio = boost::asio;
using detail::exec_fsm;
using detail::multiplexer;
using detail::exec_action_type;
using detail::consume_result;
using detail::exec_action;
using boost::system::error_code;
using boost::asio::cancellation_type_t;
@@ -61,6 +59,16 @@ std::ostream& operator<<(std::ostream& os, exec_action act)
return os << " }";
}
std::ostream& operator<<(std::ostream& os, consume_result v)
{
switch (v) {
case consume_result::needs_more: return os << "consume_result::needs_more";
case consume_result::got_response: return os << "consume_result::got_response";
case consume_result::got_push: return os << "consume_result::got_push";
default: return os << "<unknown consume_result>";
}
}
} // namespace boost::redis::detail
// Prints a message on failure. Useful for parameterized tests
@@ -119,8 +127,8 @@ void test_success()
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
@@ -218,8 +226,8 @@ void test_not_connected()
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
@@ -341,7 +349,7 @@ void test_cancel_notwaiting_notterminal()
// Simulate a successful read
auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec);
BOOST_TEST_EQ_MSG(ec, error_code(), tc.name);
BOOST_TEST_EQ_MSG(req_status.first.value(), false, tc.name); // it wasn't a push
BOOST_TEST_EQ_MSG(req_status.first, consume_result::got_response, tc.name);
BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed
BOOST_TEST_EQ_MSG(input.done_calls, 1u, tc.name);

View File

@@ -6,7 +6,6 @@
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/node.hpp>
@@ -23,7 +22,6 @@
using boost::redis::request;
using boost::redis::adapter::adapt2;
using boost::redis::adapter::result;
using boost::redis::detail::multiplexer;
using boost::redis::generic_response;
using boost::redis::ignore_t;
using boost::redis::resp3::detail::deserialize;
@@ -191,160 +189,6 @@ BOOST_AUTO_TEST_CASE(issue_233_optional_array_with_null)
}
}
//===========================================================================
// Multiplexer
std::ostream& operator<<(std::ostream& os, node const& nd)
{
os << to_string(nd.data_type) << "\n"
<< nd.aggregate_size << "\n"
<< nd.depth << "\n"
<< nd.value;
return os;
}
BOOST_AUTO_TEST_CASE(multiplexer_push)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
boost::system::error_code ec;
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16u);
// TODO: Provide operator << for generic_response so we can compare
// the whole vector.
BOOST_CHECK_EQUAL(resp.value().size(), 3u);
BOOST_CHECK_EQUAL(resp.value().at(1).value, "one");
BOOST_CHECK_EQUAL(resp.value().at(2).value, "two");
for (auto const& e : resp.value())
std::cout << e << std::endl;
}
BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more)
{
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
std::string msg;
// Only part of the message.
msg += ">2\r\n+one\r";
boost::system::error_code ec;
auto ret = mpx.consume_next(msg, ec);
BOOST_TEST(!ret.first.has_value());
msg += "\n+two\r\n";
ret = mpx.consume_next(msg, ec);
BOOST_TEST(ret.first.value());
BOOST_CHECK_EQUAL(ret.second, 16u);
// TODO: Provide operator << for generic_response so we can compare
// the whole vector.
BOOST_CHECK_EQUAL(resp.value().size(), 3u);
BOOST_CHECK_EQUAL(resp.value().at(1).value, "one");
BOOST_CHECK_EQUAL(resp.value().at(2).value, "two");
}
struct test_item {
request req;
generic_response resp;
std::shared_ptr<multiplexer::elem> elem_ptr;
bool done = false;
test_item(bool cmd_with_response = true)
{
// The exact command is irrelevant because it is not being sent
// to Redis.
req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter{resp});
elem_ptr->set_done_callback([this]() {
done = true;
});
}
};
BOOST_AUTO_TEST_CASE(multiplexer_pipeline)
{
test_item item1{};
test_item item2{false};
test_item item3{};
// Add some requests to the multiplexer.
multiplexer mpx;
mpx.add(item1.elem_ptr);
mpx.add(item3.elem_ptr);
mpx.add(item2.elem_ptr);
// These requests haven't been written yet so their statuses should
// be "waiting.".
BOOST_TEST(item1.elem_ptr->is_waiting());
BOOST_TEST(item2.elem_ptr->is_waiting());
BOOST_TEST(item3.elem_ptr->is_waiting());
// There are three requests to coalesce, a second call should do
// nothing.
BOOST_CHECK_EQUAL(mpx.prepare_write(), 3u);
BOOST_CHECK_EQUAL(mpx.prepare_write(), 0u);
// After coalescing the requests for writing their statuses should
// be changed to "staged".
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
BOOST_TEST(item3.elem_ptr->is_staged());
// There are no waiting requests to cancel since they are all
// staged.
BOOST_CHECK_EQUAL(mpx.cancel_waiting(), 0u);
// Since the requests haven't been sent (written) the done
// callback should not have been called yet.
BOOST_TEST(!item1.done);
BOOST_TEST(!item2.done);
BOOST_TEST(!item3.done);
// The commit_write call informs the multiplexer the payload was
// sent (e.g. written to the socket). This step releases requests
// that has no response.
BOOST_CHECK_EQUAL(mpx.commit_write(), 1u);
// The staged status should now have changed to written.
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(item2.elem_ptr->is_done());
BOOST_TEST(item3.elem_ptr->is_written());
// The done status should still be unchanged on requests that
// expect a response.
BOOST_TEST(!item1.done);
BOOST_TEST(item2.done);
BOOST_TEST(!item3.done);
// Consumes the next message in the read buffer.
boost::system::error_code ec;
auto const ret = mpx.consume_next("+one\r\n", ec);
// The read operation should have been successfull.
BOOST_TEST(ret.first.has_value());
BOOST_TEST(ret.second != 0u);
// The last request still did not get a response.
BOOST_TEST(item1.done);
BOOST_TEST(item2.done);
BOOST_TEST(!item3.done);
// TODO: Check the first request was removed from the queue.
}
BOOST_AUTO_TEST_CASE(read_buffer_prepare_error)
{
using boost::redis::detail::read_buffer;

571
test/test_multiplexer.cpp Normal file
View File

@@ -0,0 +1,571 @@
/* Copyright (c) 2018-2024 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/response.hpp>
#include <boost/core/lightweight_test.hpp>
#include <iostream>
#include <iterator>
#include <memory>
#include <ostream>
#include <string>
#include <string_view>
using boost::redis::request;
using boost::redis::detail::multiplexer;
using boost::redis::detail::consume_result;
using boost::redis::generic_response;
using boost::redis::resp3::node;
using boost::redis::resp3::to_string;
using boost::redis::resp3::type;
using boost::redis::response;
using boost::redis::any_adapter;
using boost::system::error_code;
namespace boost::redis::resp3 {
std::ostream& operator<<(std::ostream& os, node const& nd)
{
return os << "node{ .data_type=" << to_string(nd.data_type)
<< ", .aggregate_size=" << nd.aggregate_size << ", .depth=" << nd.depth
<< ", .value=" << nd.value << "}";
}
} // namespace boost::redis::resp3
namespace boost::redis::detail {
std::ostream& operator<<(std::ostream& os, consume_result v)
{
switch (v) {
case consume_result::needs_more: return os << "consume_result::needs_more";
case consume_result::got_response: return os << "consume_result::got_response";
case consume_result::got_push: return os << "consume_result::got_push";
default: return os << "<unknown consume_result>";
}
}
} // namespace boost::redis::detail
namespace {
struct test_item {
request req;
generic_response resp;
std::shared_ptr<multiplexer::elem> elem_ptr;
bool done = false;
test_item(bool cmd_with_response = true)
{
// The exact command is irrelevant because it is not being sent
// to Redis.
req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg");
elem_ptr = std::make_shared<multiplexer::elem>(req, any_adapter{resp});
elem_ptr->set_done_callback([this]() {
done = true;
});
}
};
void test_request_needs_more()
{
// Setup
test_item item1{};
multiplexer mpx;
// Add the element to the multiplexer and simulate a successful write
mpx.add(item1.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(!item1.done);
// Parse part of the response
error_code ec;
auto ret = mpx.consume_next("$11\r\nhello", ec);
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
// Parse the rest of it
ret = mpx.consume_next("$11\r\nhello world\r\n", ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST(item1.resp.has_value());
const node expected[] = {
{type::blob_string, 1u, 0u, "hello world"},
};
BOOST_TEST_ALL_EQ(
item1.resp->begin(),
item1.resp->end(),
std::begin(expected),
std::end(expected));
}
void test_several_requests()
{
test_item item1{};
test_item item2{false};
test_item item3{};
// Add some requests to the multiplexer.
multiplexer mpx;
mpx.add(item1.elem_ptr);
mpx.add(item3.elem_ptr);
mpx.add(item2.elem_ptr);
// These requests haven't been written yet so their statuses should
// be "waiting.".
BOOST_TEST(item1.elem_ptr->is_waiting());
BOOST_TEST(item2.elem_ptr->is_waiting());
BOOST_TEST(item3.elem_ptr->is_waiting());
// There are three requests to coalesce, a second call should do
// nothing.
BOOST_TEST_EQ(mpx.prepare_write(), 3u);
BOOST_TEST_EQ(mpx.prepare_write(), 0u);
// After coalescing the requests for writing their statuses should
// be changed to "staged".
BOOST_TEST(item1.elem_ptr->is_staged());
BOOST_TEST(item2.elem_ptr->is_staged());
BOOST_TEST(item3.elem_ptr->is_staged());
// There are no waiting requests to cancel since they are all
// staged.
BOOST_TEST_EQ(mpx.cancel_waiting(), 0u);
// Since the requests haven't been sent (written) the done
// callback should not have been called yet.
BOOST_TEST(!item1.done);
BOOST_TEST(!item2.done);
BOOST_TEST(!item3.done);
// The commit_write call informs the multiplexer the payload was
// sent (e.g. written to the socket). This step releases requests
// that has no response.
BOOST_TEST_EQ(mpx.commit_write(), 1u);
// The staged status should now have changed to written.
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(item2.elem_ptr->is_done());
BOOST_TEST(item3.elem_ptr->is_written());
// The done status should still be unchanged on requests that
// expect a response.
BOOST_TEST(!item1.done);
BOOST_TEST(item2.done);
BOOST_TEST(!item3.done);
// Consumes the next message in the read buffer.
error_code ec;
auto ret = mpx.consume_next("+one\r\n", ec);
// The read operation should have been successful.
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST(ret.second != 0u);
// The last request still did not get a response.
BOOST_TEST(item1.done);
BOOST_TEST(item2.done);
BOOST_TEST(!item3.done);
// Consumes the second message in the read buffer
// Consumes the next message in the read buffer.
ret = mpx.consume_next("+two\r\n", ec);
// The read operation should have been successful.
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST(ret.second != 0u);
// Everything done
BOOST_TEST(item1.done);
BOOST_TEST(item2.done);
BOOST_TEST(item3.done);
}
// The response to a request is received before its write
// confirmation. This might happen on heavy load
void test_request_response_before_write()
{
// Setup
auto item = std::make_unique<test_item>();
multiplexer mpx;
// Add the request and trigger the write
mpx.add(item->elem_ptr);
BOOST_TEST(item->elem_ptr->is_waiting());
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
BOOST_TEST(item->elem_ptr->is_staged());
BOOST_TEST(!item->done);
// The response is received. The request is marked as done,
// even if the write hasn't been confirmed yet
error_code ec;
auto ret = mpx.consume_next("+one\r\n", ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST(item->done);
// The request is removed
item.reset();
// The write gets confirmed and causes no problem
BOOST_TEST_EQ(mpx.commit_write(), 0u);
}
void test_push()
{
// Setup
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
// Consume an entire push
error_code ec;
auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one"},
{type::simple_string, 1u, 1u, "two"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
}
void test_push_needs_more()
{
// Setup
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
std::string msg;
// Only part of the message available.
msg += ">2\r\n+one\r";
// Consume it
error_code ec;
auto ret = mpx.consume_next(msg, ec);
BOOST_TEST_EQ(ret.first, consume_result::needs_more);
// The entire message becomes available
msg += "\n+two\r\n";
ret = mpx.consume_next(msg, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one"},
{type::simple_string, 1u, 1u, "two"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
}
// If a response is received and no request is waiting, it is interpreted
// as a push (e.g. MONITOR)
void test_push_heuristics_no_request()
{
// Setup
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
// Response received, but no request has been sent
error_code ec;
auto const ret = mpx.consume_next("+Hello world\r\n", ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 14u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::simple_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
}
// Same, but there's another request that hasn't been written yet.
// This is an edge case but might happen due to race conditions.
void test_push_heuristics_request_waiting()
{
// Setup
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
test_item item;
// Add the item but don't write it (e.g. the writer task is busy)
mpx.add(item.elem_ptr);
// Response received, but no request has been sent
error_code ec;
auto const ret = mpx.consume_next("+Hello world\r\n", ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 14u);
BOOST_TEST(resp.has_value());
const node expected[] = {
{type::simple_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected));
}
// If a response is received and the first request doesn't expect a response,
// it is interpreted as a push (e.g. SUBSCRIBE with incorrect syntax)
void test_push_heuristics_request_without_response()
{
// Setup
multiplexer mpx;
generic_response resp;
mpx.set_receive_adapter(any_adapter{resp});
test_item item{false};
// Add the request to the multiplexer
mpx.add(item.elem_ptr);
// Write it, but don't confirm the write, so the request doesn't get removed
BOOST_TEST_EQ(mpx.prepare_write(), 1u);
// Response received (e.g. syntax error)
error_code ec;
auto const ret = mpx.consume_next("-ERR wrong syntax\r\n", ec);
// Check
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 19u);
BOOST_TEST_EQ(resp.error().diagnostic, "ERR wrong syntax");
}
// We correctly reset parsing state between requests and pushes.
void test_mix_responses_pushes()
{
// Setup
multiplexer mpx;
generic_response push_resp;
mpx.set_receive_adapter(any_adapter{push_resp});
test_item item1, item2;
// Add the elements to the multiplexer and simulate a successful write
mpx.add(item1.elem_ptr);
mpx.add(item2.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
BOOST_TEST(item1.elem_ptr->is_written());
BOOST_TEST(!item1.done);
BOOST_TEST(item2.elem_ptr->is_written());
BOOST_TEST(!item2.done);
// Push
std::string_view push1_buffer = ">2\r\n+one\r\n+two\r\n";
error_code ec;
auto ret = mpx.consume_next(push1_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 16u);
BOOST_TEST(push_resp.has_value());
std::vector<node> expected{
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one"},
{type::simple_string, 1u, 1u, "two"},
};
BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end());
BOOST_TEST_NOT(item1.done);
BOOST_TEST_NOT(item2.done);
// First response
std::string_view response1_buffer = "$11\r\nHello world\r\n";
ret = mpx.consume_next(response1_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, 18u);
BOOST_TEST(item1.resp.has_value());
expected = {
{type::blob_string, 1u, 0u, "Hello world"},
};
BOOST_TEST_ALL_EQ(item1.resp->begin(), item1.resp->end(), expected.begin(), expected.end());
BOOST_TEST(item1.done);
BOOST_TEST_NOT(item2.done);
// Push
std::string_view push2_buffer = ">2\r\n+other\r\n+push\r\n";
ret = mpx.consume_next(push2_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_push);
BOOST_TEST_EQ(ret.second, 19u);
BOOST_TEST(push_resp.has_value());
expected = {
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "one" },
{type::simple_string, 1u, 1u, "two" },
{type::push, 2u, 0u, "" },
{type::simple_string, 1u, 1u, "other"},
{type::simple_string, 1u, 1u, "push" },
};
BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end());
BOOST_TEST(item1.done);
BOOST_TEST_NOT(item2.done);
// Second response
std::string_view response2_buffer = "$8\r\nResponse\r\n";
ret = mpx.consume_next(response2_buffer, ec);
BOOST_TEST_EQ(ret.first, consume_result::got_response);
BOOST_TEST_EQ(ret.second, 14u);
BOOST_TEST(item2.resp.has_value());
expected = {
{type::blob_string, 1u, 0u, "Response"},
};
BOOST_TEST_ALL_EQ(item2.resp->begin(), item2.resp->end(), expected.begin(), expected.end());
BOOST_TEST(item1.done);
BOOST_TEST(item2.done);
// Check usage
const auto usg = mpx.get_usage();
BOOST_TEST_EQ(usg.commands_sent, 2u);
BOOST_TEST_EQ(usg.bytes_sent, item1.req.payload().size() + item2.req.payload().size());
BOOST_TEST_EQ(usg.responses_received, 2u);
BOOST_TEST_EQ(usg.pushes_received, 2u);
BOOST_TEST_EQ(usg.response_bytes_received, response1_buffer.size() + response2_buffer.size());
BOOST_TEST_EQ(usg.push_bytes_received, push1_buffer.size() + push2_buffer.size());
}
// Cancellation on connection lost
void test_cancel_on_connection_lost()
{
// Setup
multiplexer mpx;
test_item item_written1, item_written2, item_staged1, item_staged2, item_waiting1, item_waiting2;
// Different items have different configurations
// (note that these are all true by default)
item_written1.req.get_config().cancel_if_unresponded = false;
item_staged1.req.get_config().cancel_if_unresponded = false;
item_waiting1.req.get_config().cancel_on_connection_lost = false;
// Make each item reach the state it should be in
mpx.add(item_written1.elem_ptr);
mpx.add(item_written2.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
BOOST_TEST_EQ(mpx.commit_write(), 0u);
mpx.add(item_staged1.elem_ptr);
mpx.add(item_staged2.elem_ptr);
BOOST_TEST_EQ(mpx.prepare_write(), 2u);
mpx.add(item_waiting1.elem_ptr);
mpx.add(item_waiting2.elem_ptr);
// Check that we got it right
BOOST_TEST(item_written1.elem_ptr->is_written());
BOOST_TEST(item_written2.elem_ptr->is_written());
BOOST_TEST(item_staged1.elem_ptr->is_staged());
BOOST_TEST(item_staged2.elem_ptr->is_staged());
BOOST_TEST(item_waiting1.elem_ptr->is_waiting());
BOOST_TEST(item_waiting2.elem_ptr->is_waiting());
// Trigger a connection lost event
mpx.cancel_on_conn_lost();
// The ones with the cancellation settings set to false are back to waiting.
// Others are cancelled
BOOST_TEST(!item_written1.done);
BOOST_TEST(item_written1.elem_ptr->is_waiting());
BOOST_TEST(item_written2.done);
BOOST_TEST(!item_staged1.done);
BOOST_TEST(item_staged1.elem_ptr->is_waiting());
BOOST_TEST(item_staged2.done);
BOOST_TEST(!item_waiting1.done);
BOOST_TEST(item_waiting1.elem_ptr->is_waiting());
BOOST_TEST(item_waiting2.done);
// Triggering it again does nothing
mpx.cancel_on_conn_lost();
BOOST_TEST(!item_written1.done);
BOOST_TEST(item_written1.elem_ptr->is_waiting());
}
// The test below fails. Uncomment when this is fixed:
// https://github.com/boostorg/redis/issues/307
// void test_cancel_on_connection_lost_half_parsed_response()
// {
// // Setup
// multiplexer mpx;
// test_item item;
// item.req.get_config().cancel_if_unresponded = false;
// error_code ec;
// // Add the request, write it and start parsing the response
// mpx.add(item.elem_ptr);
// BOOST_TEST_EQ(mpx.prepare_write(), 1u);
// BOOST_TEST_EQ(mpx.commit_write(), 0u);
// auto res = mpx.consume_next("*2\r\n+hello\r\n", ec);
// BOOST_TEST_EQ(res.first, consume_result::needs_more);
// BOOST_TEST_EQ(ec, error_code());
// // Trigger a connection lost event
// mpx.cancel_on_conn_lost();
// BOOST_TEST(!item.done);
// BOOST_TEST(item.elem_ptr->is_waiting());
// // Simulate a reconnection
// mpx.reset();
// // Successful write, and this time the response is complete
// BOOST_TEST_EQ(mpx.prepare_write(), 1u);
// BOOST_TEST_EQ(mpx.commit_write(), 0u);
// res = mpx.consume_next("*2\r\n+hello\r\n+world\r\n", ec);
// BOOST_TEST_EQ(res.first, consume_result::got_response);
// BOOST_TEST_EQ(ec, error_code());
// // Check the response
// BOOST_TEST(item.resp.has_value());
// const node expected[] = {
// {type::array, 2u, 0u, "" },
// {type::simple_string, 1u, 1u, "hello"},
// {type::simple_string, 1u, 1u, "world"},
// };
// BOOST_TEST_ALL_EQ(
// item.resp->begin(),
// item.resp->end(),
// std::begin(expected),
// std::end(expected));
// }
} // namespace
int main()
{
test_request_needs_more();
test_several_requests();
test_request_response_before_write();
test_push();
test_push_needs_more();
test_push_heuristics_no_request();
test_push_heuristics_request_without_response();
test_push_heuristics_request_waiting();
test_mix_responses_pushes();
test_cancel_on_connection_lost();
// test_cancel_on_connection_lost_half_parsed_response();
return boost::report_errors();
}