2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00
Files
redis/test/test_exec_fsm.cpp
Anarthal (Rubén Pérez) bea547481a Adds support for PubSub state restoration (#375)
Adds request::{subscribe, unsubscribe, psubscribe, punsubscribe}. When requests created with these functions are executed successfully, the created subscriptions are tracked and restore on re-connection.

close #367
2026-01-09 21:08:54 +01:00

501 lines
17 KiB
C++

//
// Copyright (c) 2025 Marcelo Zimbres Silva (mzimbres@gmail.com),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/redis/detail/connection_state.hpp>
#include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/request.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/error_code.hpp>
#include "sansio_utils.hpp"
#include <cstddef>
#include <memory>
#include <ostream>
#include <utility>
using namespace boost::redis;
namespace asio = boost::asio;
using detail::exec_fsm;
using detail::multiplexer;
using detail::exec_action_type;
using detail::consume_result;
using detail::exec_action;
using detail::connection_state;
using boost::system::error_code;
using boost::asio::cancellation_type_t;
#define BOOST_REDIS_EXEC_SWITCH_CASE(elem) \
case exec_action_type::elem: return "exec_action_type::" #elem
static auto to_string(exec_action_type t) noexcept -> char const*
{
switch (t) {
BOOST_REDIS_EXEC_SWITCH_CASE(setup_cancellation);
BOOST_REDIS_EXEC_SWITCH_CASE(immediate);
BOOST_REDIS_EXEC_SWITCH_CASE(done);
BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer);
BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response);
default: return "exec_action_type::<invalid type>";
}
}
// Operators
namespace boost::redis::detail {
std::ostream& operator<<(std::ostream& os, exec_action_type type)
{
os << to_string(type);
return os;
}
bool operator==(exec_action lhs, exec_action rhs) noexcept
{
if (lhs.type() != rhs.type())
return false;
else if (lhs.type() == exec_action_type::done)
return lhs.bytes_read() == rhs.bytes_read() && lhs.error() == rhs.error();
else
return true;
}
std::ostream& operator<<(std::ostream& os, exec_action act)
{
os << "exec_action{ .type=" << act.type();
if (act.type() == exec_action_type::done)
os << ", .bytes_read=" << act.bytes_read() << ", .error=" << act.error();
return os << " }";
}
std::ostream& operator<<(std::ostream& os, consume_result v)
{
switch (v) {
case consume_result::needs_more: return os << "consume_result::needs_more";
case consume_result::got_response: return os << "consume_result::got_response";
case consume_result::got_push: return os << "consume_result::got_push";
default: return os << "<unknown consume_result>";
}
}
} // namespace boost::redis::detail
// Prints a message on failure. Useful for parameterized tests
#define BOOST_TEST_EQ_MSG(lhs, rhs, msg) \
if (!BOOST_TEST_EQ(lhs, rhs)) { \
BOOST_LIGHTWEIGHT_TEST_OSTREAM << "Failure happened in context: " << msg << std::endl; \
}
namespace {
// A helper to create a request and its associated elem
struct elem_and_request {
request req;
std::size_t done_calls{0u}; // number of times the done callback has been invoked
std::shared_ptr<multiplexer::elem> elm;
std::weak_ptr<multiplexer::elem> weak_elm; // check that we free memory
static request make_request(request::config cfg)
{
request req{cfg};
// Empty requests are not valid. The request needs to be populated before creating the element
req.push("get", "mykey");
return req;
}
elem_and_request(request::config cfg = {})
: elem_and_request(make_request(cfg))
{ }
elem_and_request(request input_req)
: req(std::move(input_req))
{
elm = std::make_shared<multiplexer::elem>(req, any_adapter{});
elm->set_done_callback([this] {
++done_calls;
});
weak_elm = elm;
}
};
// The happy path
void test_success()
{
// Setup
connection_state st;
elem_and_request input;
exec_fsm fsm(std::move(input.elm));
error_code ec;
// Initiate
auto act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
// We should now wait for a response
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful write
BOOST_TEST_EQ(st.mpx.prepare_write(), 1u); // one request was placed in the packet to write
BOOST_TEST(st.mpx.commit_write(st.mpx.get_write_buffer().size()));
// Simulate a successful read
read(st.mpx, "$5\r\nhello\r\n");
auto req_status = st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(error_code(), 11u));
// All memory should have been freed by now
BOOST_TEST(input.weak_elm.expired());
}
// The request encountered an error while parsing
void test_parse_error()
{
// Setup
connection_state st;
elem_and_request input;
exec_fsm fsm(std::move(input.elm));
error_code ec;
// Initiate
auto act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
// We should now wait for a response
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful write
BOOST_TEST_EQ(st.mpx.prepare_write(), 1u); // one request was placed in the packet to write
BOOST_TEST(st.mpx.commit_write(st.mpx.get_write_buffer().size()));
// Simulate a read that will trigger an error.
// The second field should be a number (rather than the empty string).
// Note that although part of the buffer was consumed, the multiplexer
// currently throws this information away.
read(st.mpx, "*2\r\n$5\r\nhello\r\n:\r\n");
auto req_status = st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error::empty_field);
BOOST_TEST_EQ(req_status.second, 15u);
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(error::empty_field, 0u));
// All memory should have been freed by now
BOOST_TEST(input.weak_elm.expired());
}
// The request was configured to be cancelled on connection error, and the connection is closed
void test_cancel_if_not_connected()
{
// Setup
connection_state st;
request::config cfg;
cfg.cancel_if_not_connected = true;
elem_and_request input(cfg);
exec_fsm fsm(std::move(input.elm));
// Initiate. We're not connected, so the request gets cancelled
auto act = fsm.resume(false, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::immediate);
act = fsm.resume(false, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(error::not_connected));
// We didn't leave memory behind
BOOST_TEST(input.weak_elm.expired());
}
// The connection is closed when we start the request, but the request was configured to wait
void test_not_connected()
{
// Setup
connection_state st;
elem_and_request input;
exec_fsm fsm(std::move(input.elm));
error_code ec;
// Initiate
auto act = fsm.resume(false, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
// We should now wait for a response
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful write
BOOST_TEST_EQ(st.mpx.prepare_write(), 1u); // one request was placed in the packet to write
BOOST_TEST(st.mpx.commit_write(st.mpx.get_write_buffer().size()));
// Simulate a successful read
read(st.mpx, "$5\r\nhello\r\n");
auto req_status = st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(error_code(), 11u));
// All memory should have been freed by now
BOOST_TEST(input.weak_elm.expired());
}
//
// Cancellations
//
// If the request is waiting, all cancellation types are supported
void test_cancel_waiting()
{
constexpr struct {
const char* name;
asio::cancellation_type_t type;
} test_cases[] = {
{"terminal", asio::cancellation_type_t::terminal },
{"partial", asio::cancellation_type_t::partial },
{"total", asio::cancellation_type_t::total },
{"mixed", asio::cancellation_type_t::partial | asio::cancellation_type_t::terminal},
{"all", asio::cancellation_type_t::all },
};
for (const auto& tc : test_cases) {
// Setup
connection_state st;
elem_and_request input, input2;
exec_fsm fsm(std::move(input.elm));
// Another request enters the multiplexer, so it's busy when we start
st.mpx.add(input2.elm);
BOOST_TEST_EQ_MSG(st.mpx.prepare_write(), 1u, tc.name);
// Initiate and wait
auto act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::setup_cancellation, tc.name);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::notify_writer, tc.name);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name);
// We get notified because the request got cancelled
act = fsm.resume(true, st, tc.type);
BOOST_TEST_EQ_MSG(act, exec_action(asio::error::operation_aborted), tc.name);
BOOST_TEST_EQ_MSG(input.weak_elm.expired(), true, tc.name); // we didn't leave memory behind
}
}
// If the request is being processed and terminal or partial
// cancellation is requested, we mark the request as abandoned
void test_cancel_notwaiting_terminal_partial()
{
constexpr struct {
const char* name;
asio::cancellation_type_t type;
} test_cases[] = {
{"terminal", asio::cancellation_type_t::terminal},
{"partial", asio::cancellation_type_t::partial },
};
for (const auto& tc : test_cases) {
// Setup
connection_state st;
auto input = std::make_unique<elem_and_request>();
exec_fsm fsm(std::move(input->elm));
// Initiate
auto act = fsm.resume(false, st, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::setup_cancellation, tc.name);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::notify_writer, tc.name);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name);
// The multiplexer starts writing the request
BOOST_TEST_EQ_MSG(st.mpx.prepare_write(), 1u, tc.name);
BOOST_TEST_EQ_MSG(st.mpx.commit_write(st.mpx.get_write_buffer().size()), true, tc.name);
// A cancellation arrives
act = fsm.resume(true, st, tc.type);
BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted));
input.reset(); // Verify we don't access the request or response after completion
error_code ec;
// When the response to this request arrives, it gets ignored
read(st.mpx, "-ERR wrong command\r\n");
auto res = st.mpx.consume(ec);
BOOST_TEST_EQ_MSG(ec, error_code(), tc.name);
BOOST_TEST_EQ_MSG(res.first, consume_result::got_response, tc.name);
// The multiplexer::elem object needs to survive here to mark the
// request as abandoned
}
}
// If the request is being processed and total cancellation is requested, we ignore the cancellation
void test_cancel_notwaiting_total()
{
// Setup
connection_state st;
elem_and_request input;
exec_fsm fsm(std::move(input.elm));
error_code ec;
// Initiate
auto act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful write
BOOST_TEST_EQ(st.mpx.prepare_write(), 1u);
BOOST_TEST(st.mpx.commit_write(st.mpx.get_write_buffer().size()));
// We got requested a cancellation here, but we can't honor it
act = fsm.resume(true, st, asio::cancellation_type_t::total);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful read
read(st.mpx, "$5\r\nhello\r\n");
auto req_status = st.mpx.consume(ec);
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(req_status.first, consume_result::got_response);
BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed
BOOST_TEST_EQ(input.done_calls, 1u);
// This will awaken the exec operation, and should complete the operation
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(error_code(), 11u));
// All memory should have been freed by now
BOOST_TEST_EQ(input.weak_elm.expired(), true);
}
// If a request completes successfully and contained pubsub changes, these are committed
void test_subscription_tracking_success()
{
// Setup
request req;
req.subscribe({"ch1", "ch2"});
connection_state st;
elem_and_request input{std::move(req)};
exec_fsm fsm(std::move(input.elm));
// Initiate
auto act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
// We should now wait for a response
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a successful write
BOOST_TEST_EQ(st.mpx.prepare_write(), 1u); // one request was placed in the packet to write
BOOST_TEST(st.mpx.commit_write(st.mpx.get_write_buffer().size()));
// The request doesn't have a response, so this will
// awaken the exec operation, and should complete the operation
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(error_code(), 0u));
// All memory should have been freed by now
BOOST_TEST(input.weak_elm.expired());
// The subscription has been added to the tracker
request tracker_req;
st.tracker.compose_subscribe_request(tracker_req);
request expected_req;
expected_req.push("SUBSCRIBE", "ch1", "ch2");
BOOST_TEST_EQ(tracker_req.payload(), expected_req.payload());
}
// If the request errors, tracked subscriptions are not committed
void test_subscription_tracking_error()
{
// Setup
request req;
req.subscribe({"ch1", "ch2"});
connection_state st;
elem_and_request input{std::move(req)};
exec_fsm fsm(std::move(input.elm));
// Initiate
auto act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::setup_cancellation);
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::notify_writer);
// We should now wait for a response
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action_type::wait_for_response);
// Simulate a write error, which would trigger a reconnection
BOOST_TEST_EQ(st.mpx.prepare_write(), 1u); // one request was placed in the packet to write
st.mpx.cancel_on_conn_lost();
// This awakens the request
act = fsm.resume(true, st, cancellation_type_t::none);
BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted, 0u));
// All memory should have been freed by now
BOOST_TEST(input.weak_elm.expired());
// The subscription has not been added to the tracker
request tracker_req;
st.tracker.compose_subscribe_request(tracker_req);
BOOST_TEST_EQ(tracker_req.payload(), "");
}
} // namespace
int main()
{
test_success();
test_parse_error();
test_cancel_if_not_connected();
test_not_connected();
test_cancel_waiting();
test_cancel_notwaiting_terminal_partial();
test_cancel_notwaiting_total();
test_subscription_tracking_success();
test_subscription_tracking_error();
return boost::report_errors();
}