diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 60fdd448..b283af27 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -118,9 +118,6 @@ struct connection_impl { case exec_action_type::wait_for_response: notifier_->async_receive(std::move(self)); return; - case exec_action_type::cancel_run: - obj_->cancel(operation::run); - continue; // this action does not require yielding case exec_action_type::done: notifier_.reset(); self.complete(act.error(), act.bytes_read()); @@ -763,40 +760,48 @@ public: /** @brief Executes commands on the Redis server asynchronously. * - * This function sends a request to the Redis server and waits for - * the responses to each individual command in the request. If the - * request contains only commands that don't expect a response, - * the completion occurs after it has been written to the - * underlying stream. Multiple concurrent calls to this function - * will be automatically queued by the implementation. + * This function sends a request to the Redis server and waits for + * the responses to each individual command in the request. If the + * request contains only commands that don't expect a response, + * the completion occurs after it has been written to the + * underlying stream. Multiple concurrent calls to this function + * will be automatically queued by the implementation. * - * For an example see cpp20_echo_server.cpp. + * For an example see cpp20_echo_server.cpp. * * The completion token must have the following signature: * - * @code - * void f(system::error_code, std::size_t); - * @endcode + * @code + * void f(system::error_code, std::size_t); + * @endcode * - * Where the second parameter is the size of the response received - * in bytes. + * Where the second parameter is the size of the response received + * in bytes. * * @par Per-operation cancellation - * This operation supports per-operation cancellation. The following cancellation types - * are supported: + * This operation supports per-operation cancellation. Depending on the state of the request + * when cancellation is requested, we can encounter two scenarios: * - * @li `asio::cancellation_type_t::terminal`. Always supported. May cause the current - * `async_run` operation to be cancelled. - * @li `asio::cancellation_type_t::partial` and `asio::cancellation_type_t::total`. - * Supported only if the request hasn't been written to the network yet. + * @li If the request hasn't been sent to the server yet, cancellation will prevent it + * from being sent to the server. In this situation, all cancellation types are supported + * (`asio::cancellation_type_t::terminal`, `asio::cancellation_type_t::partial` and + * `asio::cancellation_type_t::total`). + * @li If the request has been sent to the server but the response hasn't arrived yet, + * cancellation will cause `async_exec` to complete immediately. When the response + * arrives from the server, it will be ignored. In this situation, only + * `asio::cancellation_type_t::terminal` and `asio::cancellation_type_t::partial` + * are supported. Cancellation requests specifying `asio::cancellation_type_t::total` + * only will be ignored. + * + * In any case, connections can be safely used after cancelling `async_exec` operations. * * @par Object lifetimes * Both `req` and `res` should be kept alive until the operation completes. * No copies of the request object are made. * - * @param req The request to be executed. - * @param resp The response object to parse data into. - * @param token Completion token. + * @param req The request to be executed. + * @param resp The response object to parse data into. + * @param token Completion token. */ template < class Response = ignore_t, @@ -808,41 +813,49 @@ public: /** @brief Executes commands on the Redis server asynchronously. * - * This function sends a request to the Redis server and waits for - * the responses to each individual command in the request. If the - * request contains only commands that don't expect a response, - * the completion occurs after it has been written to the - * underlying stream. Multiple concurrent calls to this function - * will be automatically queued by the implementation. + * This function sends a request to the Redis server and waits for + * the responses to each individual command in the request. If the + * request contains only commands that don't expect a response, + * the completion occurs after it has been written to the + * underlying stream. Multiple concurrent calls to this function + * will be automatically queued by the implementation. * - * For an example see cpp20_echo_server.cpp. + * For an example see cpp20_echo_server.cpp. * * The completion token must have the following signature: * - * @code - * void f(system::error_code, std::size_t); - * @endcode + * @code + * void f(system::error_code, std::size_t); + * @endcode * - * Where the second parameter is the size of the response received - * in bytes. + * Where the second parameter is the size of the response received + * in bytes. * * @par Per-operation cancellation - * This operation supports per-operation cancellation. The following cancellation types - * are supported: + * This operation supports per-operation cancellation. Depending on the state of the request + * when cancellation is requested, we can encounter two scenarios: * - * @li `asio::cancellation_type_t::terminal`. Always supported. May cause the current - * `async_run` operation to be cancelled. - * @li `asio::cancellation_type_t::partial` and `asio::cancellation_type_t::total`. - * Supported only if the request hasn't been written to the network yet. + * @li If the request hasn't been sent to the server yet, cancellation will prevent it + * from being sent to the server. In this situation, all cancellation types are supported + * (`asio::cancellation_type_t::terminal`, `asio::cancellation_type_t::partial` and + * `asio::cancellation_type_t::total`). + * @li If the request has been sent to the server but the response hasn't arrived yet, + * cancellation will cause `async_exec` to complete immediately. When the response + * arrives from the server, it will be ignored. In this situation, only + * `asio::cancellation_type_t::terminal` and `asio::cancellation_type_t::partial` + * are supported. Cancellation requests specifying `asio::cancellation_type_t::total` + * only will be ignored. + * + * In any case, connections can be safely used after cancelling `async_exec` operations. * * @par Object lifetimes * Both `req` and any response object referenced by `adapter` * should be kept alive until the operation completes. * No copies of the request object are made. * - * @param req The request to be executed. - * @param adapter An adapter object referencing a response to place data into. - * @param token Completion token. + * @param req The request to be executed. + * @param adapter An adapter object referencing a response to place data into. + * @param token Completion token. */ template > auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token = {}) diff --git a/include/boost/redis/detail/exec_fsm.hpp b/include/boost/redis/detail/exec_fsm.hpp index 52ec767b..3727d07c 100644 --- a/include/boost/redis/detail/exec_fsm.hpp +++ b/include/boost/redis/detail/exec_fsm.hpp @@ -29,7 +29,6 @@ enum class exec_action_type done, // Call the final handler notify_writer, // Notify the writer task wait_for_response, // Wait to be notified - cancel_run, // Cancel the connection's run operation }; class exec_action { diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index aa3f77a0..06400918 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -95,6 +95,16 @@ public: auto get_adapter() -> any_adapter& { return adapter_; } + // Marks the element as an abandoned request. An abandoned request + // won't cause problems when its response arrives, but that response will be ignored. + void mark_abandoned(); + + [[nodiscard]] + bool is_abandoned() const + { + return !req_; + } + private: enum class status { @@ -138,7 +148,7 @@ public: -> std::pair; auto add(std::shared_ptr const& ptr) -> void; - auto remove(std::shared_ptr const& ptr) -> bool; + void cancel(std::shared_ptr const& ptr); auto reset() -> void; [[nodiscard]] diff --git a/include/boost/redis/impl/connection_logger.ipp b/include/boost/redis/impl/connection_logger.ipp index effa6b9d..2ce42420 100644 --- a/include/boost/redis/impl/connection_logger.ipp +++ b/include/boost/redis/impl/connection_logger.ipp @@ -42,7 +42,6 @@ auto to_string(exec_action_type t) noexcept -> char const* BOOST_REDIS_EXEC_SWITCH_CASE(done); BOOST_REDIS_EXEC_SWITCH_CASE(notify_writer); BOOST_REDIS_EXEC_SWITCH_CASE(wait_for_response); - BOOST_REDIS_EXEC_SWITCH_CASE(cancel_run); default: return "exec_action_type::"; } } diff --git a/include/boost/redis/impl/exec_fsm.ipp b/include/boost/redis/impl/exec_fsm.ipp index 25675e02..b7be8c60 100644 --- a/include/boost/redis/impl/exec_fsm.ipp +++ b/include/boost/redis/impl/exec_fsm.ipp @@ -18,11 +18,14 @@ namespace boost::redis::detail { -inline bool is_cancellation(asio::cancellation_type_t type) +inline bool is_partial_or_terminal_cancel(asio::cancellation_type_t type) { - return !!( - type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial | - asio::cancellation_type_t::terminal)); + return !!(type & (asio::cancellation_type_t::partial | asio::cancellation_type_t::terminal)); +} + +inline bool is_total_cancel(asio::cancellation_type_t type) +{ + return !!(type & asio::cancellation_type_t::total); } exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t cancel_state) @@ -63,19 +66,12 @@ exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t return act; } - // If we're cancelled, try to remove the request from the queue. This will only - // succeed if the request is waiting (wasn't written yet) - if (is_cancellation(cancel_state) && mpx_->remove(elem_)) { - elem_.reset(); // Deallocate memory before finalizing - return exec_action{asio::error::operation_aborted}; - } - - // If we hit a terminal cancellation, tear down the connection. - // Otherwise, go back to waiting. - // TODO: we could likely do better here and mark the request as cancelled, removing - // the done callback and the adapter. But this requires further exploration - if (!!(cancel_state & asio::cancellation_type_t::terminal)) { - BOOST_REDIS_YIELD(resume_point_, 5, exec_action_type::cancel_run) + // Total cancellation can only be handled if the request hasn't been sent yet. + // Partial and terminal cancellation can always be served + if ( + (is_total_cancel(cancel_state) && elem_->is_waiting()) || + is_partial_or_terminal_cancel(cancel_state)) { + mpx_->cancel(elem_); elem_.reset(); // Deallocate memory before finalizing return exec_action{asio::error::operation_aborted}; } diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index 74e2e03d..90168f84 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -5,6 +5,7 @@ */ #include +#include #include #include @@ -38,14 +39,23 @@ auto multiplexer::elem::commit_response(std::size_t read_size) -> void --remaining_responses_; } -bool multiplexer::remove(std::shared_ptr const& ptr) +void multiplexer::elem::mark_abandoned() +{ + req_ = nullptr; + adapter_ = any_adapter(); // A default-constructed any_adapter ignores all nodes + set_done_callback([] { }); +} + +void multiplexer::cancel(std::shared_ptr const& ptr) { if (ptr->is_waiting()) { + // We can safely remove it from the queue, since it hasn't been sent yet reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), ptr)); - return true; + } else { + // Removing the request would cause trouble when the response arrived. + // Mark it as abandoned, so the response is discarded when it arrives + ptr->mark_abandoned(); } - - return false; } std::size_t multiplexer::commit_write() @@ -70,6 +80,8 @@ std::size_t multiplexer::commit_write() void multiplexer::add(std::shared_ptr const& info) { + BOOST_ASSERT(!info->is_abandoned()); + reqs_.push_back(info); if (request_access::has_priority(info->get_request())) { @@ -171,8 +183,9 @@ std::size_t multiplexer::prepare_write() return !ri->is_waiting(); }); - std::for_each(point, std::cend(reqs_), [this](auto const& ri) { + std::for_each(point, std::cend(reqs_), [this](const std::shared_ptr& ri) { // Stage the request. + BOOST_ASSERT(!ri->is_abandoned()); write_buffer_ += ri->get_request().payload(); ri->mark_staged(); usage_.commands_sent += ri->get_request().get_commands(); @@ -211,9 +224,14 @@ void multiplexer::cancel_on_conn_lost() cancel_run_called_ = true; // Must return false if the request should be removed. - auto cond = [](auto const& ptr) { + auto cond = [](const std::shared_ptr& ptr) { BOOST_ASSERT(ptr != nullptr); + // Abandoned requests only make sense because a response for them might arrive. + // They should be discarded after the connection is lost + if (ptr->is_abandoned()) + return false; + if (ptr->is_waiting()) { return !ptr->get_request().get_config().cancel_on_connection_lost; } else { @@ -284,9 +302,12 @@ bool multiplexer::is_next_push(std::string_view data) const noexcept std::size_t multiplexer::release_push_requests() { - auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) { - return !(ptr->is_written() && ptr->get_request().get_expected_responses() == 0); - }); + auto point = std::stable_partition( + std::begin(reqs_), + std::end(reqs_), + [](const std::shared_ptr& ptr) { + return !(ptr->is_written() && ptr->get_remaining_responses() == 0u); + }); std::for_each(point, std::end(reqs_), [](auto const& ptr) { ptr->notify_done(); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8a47244f..cd2a7a81 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -54,7 +54,6 @@ make_test(test_conn_exec) make_test(test_conn_push) make_test(test_conn_reconnect) make_test(test_conn_exec_cancel) -make_test(test_conn_exec_cancel2) make_test(test_conn_echo_stress) make_test(test_conn_move) make_test(test_conn_setup) diff --git a/test/test_conn_exec.cpp b/test/test_conn_exec.cpp index dd9efc34..4ac3bba9 100644 --- a/test/test_conn_exec.cpp +++ b/test/test_conn_exec.cpp @@ -122,25 +122,6 @@ BOOST_AUTO_TEST_CASE(wrong_response_data_type) BOOST_TEST(finished); } -BOOST_AUTO_TEST_CASE(cancel_request_if_not_connected) -{ - request req; - req.get_config().cancel_if_not_connected = true; - req.push("PING"); - - net::io_context ioc; - auto conn = std::make_shared(ioc); - bool finished = false; - conn->async_exec(req, ignore, [conn, &finished](error_code ec, std::size_t) { - BOOST_TEST(ec, boost::redis::error::not_connected); - conn->cancel(); - finished = true; - }); - - ioc.run_for(test_timeout); - BOOST_TEST(finished); -} - BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170) { // See https://github.com/boostorg/redis/issues/170 diff --git a/test/test_conn_exec_cancel.cpp b/test/test_conn_exec_cancel.cpp index 9c32a8cb..6bb1ec30 100644 --- a/test/test_conn_exec_cancel.cpp +++ b/test/test_conn_exec_cancel.cpp @@ -7,33 +7,27 @@ #include #include #include +#include +#include #include +#include #include #include #include #include +#include #include -#include -#define BOOST_TEST_MODULE conn_exec_cancel -#include -#include - #include "common.hpp" -#ifdef BOOST_ASIO_HAS_CO_AWAIT -#include +#include +#include using namespace std::chrono_literals; -// NOTE1: I have observed that if hello and -// blpop are sent together, Redis will send the response of hello -// right away, not waiting for blpop. - namespace net = boost::asio; using error_code = boost::system::error_code; -using namespace net::experimental::awaitable_operators; using boost::redis::operation; using boost::redis::error; using boost::redis::request; @@ -45,95 +39,15 @@ using boost::redis::logger; using boost::redis::connection; using namespace std::chrono_literals; +// TODO: replace this by connection once it supports asio::cancel_after +// See https://github.com/boostorg/redis/issues/226 +using connection_type = boost::redis::basic_connection; + namespace { -auto implicit_cancel_of_req_written() -> net::awaitable -{ - auto ex = co_await net::this_coro::executor; - auto conn = std::make_shared(ex); - - auto cfg = make_test_config(); - cfg.health_check_interval = std::chrono::seconds::zero(); - run(conn, cfg); - - // See NOTE1. - request req0; - req0.push("PING"); - co_await conn->async_exec(req0, ignore); - - // Will be cancelled after it has been written but before the - // response arrives. - request req1; - req1.push("BLPOP", "any", 3); - - net::steady_timer st{ex}; - st.expires_after(std::chrono::seconds{1}); - - // Achieves implicit cancellation when the timer fires. - boost::system::error_code ec1, ec2; - co_await (conn->async_exec(req1, ignore, redir(ec1)) || st.async_wait(redir(ec2))); - - conn->cancel(); - - // I have observed this produces terminal cancellation so it can't - // be ignored, an error is expected. - BOOST_TEST(ec1 == net::error::operation_aborted); - BOOST_TEST(ec2 == error_code()); -} - -BOOST_AUTO_TEST_CASE(test_ignore_implicit_cancel_of_req_written) -{ - run_coroutine_test(implicit_cancel_of_req_written()); -} - -BOOST_AUTO_TEST_CASE(test_cancel_of_req_written_on_run_canceled) -{ - net::io_context ioc; - auto conn = std::make_shared(ioc); - - request req0; - req0.push("PING"); - - // Sends a request that will be blocked forever, so we can test - // canceling it while waiting for a response. - request req1; - req1.get_config().cancel_on_connection_lost = true; - req1.get_config().cancel_if_unresponded = true; - req1.push("BLPOP", "any", 0); - - bool finished = false; - - auto c1 = [&](error_code ec, std::size_t) { - BOOST_CHECK_EQUAL(ec, net::error::operation_aborted); - finished = true; - }; - - auto c0 = [&](error_code ec, std::size_t) { - BOOST_TEST(ec == error_code()); - conn->async_exec(req1, ignore, c1); - }; - - conn->async_exec(req0, ignore, c0); - - auto cfg = make_test_config(); - cfg.health_check_interval = std::chrono::seconds{5}; - run(conn); - - net::steady_timer st{ioc}; - st.expires_after(std::chrono::seconds{1}); - st.async_wait([&](error_code ec) { - BOOST_TEST(ec == error_code()); - conn->cancel(operation::run); - conn->cancel(operation::reconnection); - }); - - ioc.run_for(test_timeout); - BOOST_TEST(finished); -} - // We can cancel requests that haven't been written yet. // All cancellation types are supported here. -BOOST_AUTO_TEST_CASE(test_cancel_pending) +void test_cancel_pending() { struct { const char* name; @@ -145,38 +59,247 @@ BOOST_AUTO_TEST_CASE(test_cancel_pending) }; for (const auto& tc : test_cases) { - BOOST_TEST_CONTEXT(tc.name) - { - // Setup - net::io_context ctx; - connection conn(ctx); - request req; - req.push("get", "mykey"); + std::cerr << "Running test case: " << tc.name << std::endl; - // Issue a request without calling async_run(), so the request stays waiting forever - net::cancellation_signal sig; - bool called = false; - conn.async_exec( - req, - ignore, - net::bind_cancellation_slot(sig.slot(), [&](error_code ec, std::size_t sz) { - BOOST_TEST(ec == net::error::operation_aborted); - BOOST_TEST(sz == 0u); - called = true; - })); + // Setup + net::io_context ctx; + connection conn(ctx); + request req; + req.push("get", "mykey"); - // Issue a cancellation - sig.emit(tc.cancel_type); + // Issue a request without calling async_run(), so the request stays waiting forever + net::cancellation_signal sig; + bool called = false; + conn.async_exec( + req, + ignore, + net::bind_cancellation_slot(sig.slot(), [&](error_code ec, std::size_t sz) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + BOOST_TEST_EQ(sz, 0u); + called = true; + })); - // Prevent the test for deadlocking in case of failure - ctx.run_for(3s); - BOOST_TEST(called); - } + // Issue a cancellation + sig.emit(tc.cancel_type); + + // Prevent the test for deadlocking in case of failure + ctx.run_for(test_timeout); + BOOST_TEST(called); } } +// We can cancel requests that have been written but which +// responses haven't been received yet. +// Terminal and partial cancellation types are supported here. +void test_cancel_written() +{ + // Setup + net::io_context ctx; + connection_type conn{ctx}; + auto cfg = make_test_config(); + cfg.health_check_interval = std::chrono::seconds::zero(); + bool run_finished = false, exec1_finished = false, exec2_finished = false, + exec3_finished = false; + + // Will be cancelled after it has been written but before the + // response arrives. Create everything in dynamic memory to verify + // we don't try to access things after completion. + auto req1 = std::make_unique(); + req1->push("BLPOP", "any", 1); + auto r1 = std::make_unique>(); + + // Will be cancelled too because it's sent after BLPOP. + // Tests that partial cancellation is supported, too. + request req2; + req2.push("PING", "partial_cancellation"); + + // Will finish successfully once the response to the BLPOP arrives + request req3; + req3.push("PING", "after_blpop"); + response r3; + + // Run the connection + conn.async_run(cfg, [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; + }); + + // The request will be cancelled before it receives a response. + // Our BLPOP will wait for longer than the timeout we're using. + // Clear allocated memory to check we don't access the request or + // response when the server response arrives. + auto blpop_cb = [&](error_code ec, std::size_t) { + req1.reset(); + r1.reset(); + BOOST_TEST_EQ(ec, net::error::operation_aborted); + exec1_finished = true; + }; + conn.async_exec(*req1, *r1, net::cancel_after(500ms, blpop_cb)); + + // The first PING will be cancelled, too. Use partial cancellation here. + auto req2_cb = [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + exec2_finished = true; + }; + conn.async_exec( + req2, + ignore, + net::cancel_after(500ms, net::cancellation_type_t::partial, req2_cb)); + + // The second PING's response will be received after the BLPOP's response, + // but it will be processed successfully. + conn.async_exec(req3, r3, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(std::get<0>(r3).value(), "after_blpop"); + conn.cancel(); + exec3_finished = true; + }); + + ctx.run_for(test_timeout); + BOOST_TEST(run_finished); + BOOST_TEST(exec1_finished); + BOOST_TEST(exec2_finished); + BOOST_TEST(exec3_finished); +} + +// Requests configured to do so are cancelled if the connection +// hasn't been established when they are executed +void test_cancel_if_not_connected() +{ + net::io_context ioc; + connection conn{ioc}; + + request req; + req.get_config().cancel_if_not_connected = true; + req.push("PING"); + + bool exec_finished = false; + conn.async_exec(req, ignore, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error::not_connected); + exec_finished = true; + }); + + ioc.run_for(test_timeout); + BOOST_TEST(exec_finished); +} + +// Requests configured to do so are cancelled when the connection is lost. +// Tests with a written request that hasn't been responded yet +void test_cancel_on_connection_lost_written() +{ + // Setup + net::io_context ioc; + connection conn{ioc}; + + // req0 and req1 will be coalesced together. When req0 + // completes, we know that req1 will be waiting for a response. + // req1 will block forever. + request req0; + req0.push("PING"); + + request req1; + req1.get_config().cancel_on_connection_lost = true; + req1.get_config().cancel_if_unresponded = true; + req1.push("BLPOP", "any", 0); + + bool run_finished = false, exec0_finished = false, exec1_finished = false; + + // Run the connection + auto cfg = make_test_config(); + conn.async_run(cfg, [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; + }); + + // Execute both requests + conn.async_exec(req0, ignore, [&](error_code ec, std::size_t) { + // The request finished successfully + BOOST_TEST_EQ(ec, error_code()); + exec0_finished = true; + + // We know that req1 has been written to the server, too. Trigger a cancellation + conn.cancel(operation::run); + conn.cancel(operation::reconnection); + }); + + conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + exec1_finished = true; + }); + + ioc.run_for(test_timeout); + BOOST_TEST(run_finished); + BOOST_TEST(exec0_finished); + BOOST_TEST(exec1_finished); +} + +// connection::cancel(operation::exec) works. Pending requests are cancelled, +// but written requests are not +void test_cancel_operation_exec() +{ + // Setup + net::io_context ctx; + connection conn{ctx}; + bool run_finished = false, exec0_finished = false, exec1_finished = false, + exec2_finished = false; + + request req0; + req0.push("PING", "before_blpop"); + + request req1; + req1.push("BLPOP", "any", 1); + generic_response r1; + + request req2; + req2.push("PING", "after_blpop"); + + // Run the connection + conn.async_run(make_test_config(), [&](error_code ec) { + BOOST_TEST_EQ(ec, net::error::operation_aborted); + run_finished = true; + }); + + // Execute req0 and req1. They will be coalesced together. + // When req0 completes, we know that req1 will be waiting its response + conn.async_exec(req0, ignore, [&](error_code ec, std::size_t) { + BOOST_TEST_EQ(ec, error_code()); + exec0_finished = true; + conn.cancel(operation::exec); + }); + + // By default, ignore will issue an error when a NULL is received. + // ATM, this causes the connection to be torn down. Using a generic_response avoids this. + // See https://github.com/boostorg/redis/issues/314 + conn.async_exec(req1, r1, [&](error_code ec, std::size_t) { + // No error should occur since the cancellation should be ignored + std::cout << "async_exec (1): " << ec.message() << std::endl; + BOOST_TEST_EQ(ec, error_code()); + exec1_finished = true; + + // The connection remains usable + conn.async_exec(req2, ignore, [&](error_code ec2, std::size_t) { + BOOST_TEST_EQ(ec2, error_code()); + exec2_finished = true; + conn.cancel(); + }); + }); + + ctx.run_for(test_timeout); + BOOST_TEST(run_finished); + BOOST_TEST(exec0_finished); + BOOST_TEST(exec1_finished); + BOOST_TEST(exec2_finished); +} + } // namespace -#else -BOOST_AUTO_TEST_CASE(dummy) { } -#endif +int main() +{ + test_cancel_pending(); + test_cancel_written(); + test_cancel_if_not_connected(); + test_cancel_on_connection_lost_written(); + test_cancel_operation_exec(); + + return boost::report_errors(); +} diff --git a/test/test_conn_exec_cancel2.cpp b/test/test_conn_exec_cancel2.cpp deleted file mode 100644 index 7b745fc6..00000000 --- a/test/test_conn_exec_cancel2.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) - * - * Distributed under the Boost Software License, Version 1.0. (See - * accompanying file LICENSE.txt) - */ - -#include - -#include -#define BOOST_TEST_MODULE conn_exec_cancel -#include - -#include "common.hpp" - -#include - -#ifdef BOOST_ASIO_HAS_CO_AWAIT - -// NOTE1: Sends hello separately. I have observed that if hello and -// blpop are sent toguether, Redis will send the response of hello -// right away, not waiting for blpop. That is why we have to send it -// separately. - -namespace net = boost::asio; -using error_code = boost::system::error_code; -using boost::redis::operation; -using boost::redis::request; -using boost::redis::response; -using boost::redis::generic_response; -using boost::redis::ignore; -using boost::redis::ignore_t; -using boost::redis::config; -using boost::redis::logger; -using boost::redis::connection; -using namespace std::chrono_literals; - -namespace { - -auto async_ignore_explicit_cancel_of_req_written() -> net::awaitable -{ - auto ex = co_await net::this_coro::executor; - - generic_response gresp; - auto conn = std::make_shared(ex); - - run(conn); - - net::steady_timer st{ex}; - st.expires_after(std::chrono::seconds{1}); - - // See NOTE1. - request req0; - req0.push("PING", "async_ignore_explicit_cancel_of_req_written"); - co_await conn->async_exec(req0, gresp); - - request req1; - req1.push("BLPOP", "any", 3); - - bool seen = false; - conn->async_exec(req1, gresp, [&](error_code ec, std::size_t) { - // No error should occur since the cancellation should be ignored - std::cout << "async_exec (1): " << ec.message() << std::endl; - BOOST_TEST(ec == error_code()); - seen = true; - }); - - // Will complete while BLPOP is pending. - error_code ec; - co_await st.async_wait(net::redirect_error(ec)); - conn->cancel(operation::exec); - - BOOST_TEST(ec == error_code()); - - request req2; - req2.push("PING"); - - // Test whether the connection remains usable after a call to - // cancel(exec). - co_await conn->async_exec(req2, gresp, net::redirect_error(ec)); - conn->cancel(); - - BOOST_TEST(ec == error_code()); - BOOST_TEST(seen); -} - -BOOST_AUTO_TEST_CASE(test_ignore_explicit_cancel_of_req_written) -{ - run_coroutine_test(async_ignore_explicit_cancel_of_req_written()); -} - -} // namespace - -#else -BOOST_AUTO_TEST_CASE(dummy) { } -#endif diff --git a/test/test_conn_reconnect.cpp b/test/test_conn_reconnect.cpp index ab172af6..2a9ea337 100644 --- a/test/test_conn_reconnect.cpp +++ b/test/test_conn_reconnect.cpp @@ -44,6 +44,7 @@ net::awaitable test_reconnect_impl() request regular_req; regular_req.push("PING", "SomeValue"); regular_req.get_config().cancel_on_connection_lost = false; + regular_req.get_config().cancel_if_unresponded = false; auto conn = std::make_shared(ex); auto cfg = make_test_config(); diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index faaa5997..f8a83ce2 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -281,56 +281,26 @@ void test_cancel_waiting() } } -// If the request is being processed and terminal cancellation got requested, we cancel the connection -void test_cancel_notwaiting_terminal() -{ - // Setup - multiplexer mpx; - elem_and_request input; - exec_fsm fsm(mpx, std::move(input.elm)); - - // Initiate - auto act = fsm.resume(false, cancellation_type_t::none); - BOOST_TEST_EQ(act, exec_action_type::setup_cancellation); - act = fsm.resume(true, cancellation_type_t::none); - BOOST_TEST_EQ(act, exec_action_type::notify_writer); - - act = fsm.resume(true, cancellation_type_t::none); - BOOST_TEST_EQ(act, exec_action_type::wait_for_response); - - // The multiplexer starts writing the request - BOOST_TEST_EQ(mpx.prepare_write(), 1u); // one request was placed in the packet to write - - // A cancellation arrives - act = fsm.resume(true, cancellation_type_t::terminal); - BOOST_TEST_EQ(act, exec_action_type::cancel_run); - act = fsm.resume(true, cancellation_type_t::terminal); - BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted)); - - // The object needs to survive here, otherwise an inconsistent connection state is created -} - -// If the request is being processed and other types of cancellation got requested, we ignore the cancellation -void test_cancel_notwaiting_notterminal() +// If the request is being processed and terminal or partial +// cancellation is requested, we mark the request as abandoned +void test_cancel_notwaiting_terminal_partial() { constexpr struct { const char* name; asio::cancellation_type_t type; } test_cases[] = { - {"partial", asio::cancellation_type_t::partial }, - {"total", asio::cancellation_type_t::total }, - {"mixed", asio::cancellation_type_t::partial | asio::cancellation_type_t::total}, + {"terminal", asio::cancellation_type_t::terminal}, + {"partial", asio::cancellation_type_t::partial }, }; for (const auto& tc : test_cases) { // Setup multiplexer mpx; - elem_and_request input; - exec_fsm fsm(mpx, std::move(input.elm)); - error_code ec; + auto input = std::make_unique(); + exec_fsm fsm(mpx, std::move(input->elm)); // Initiate - auto act = fsm.resume(true, cancellation_type_t::none); + auto act = fsm.resume(false, cancellation_type_t::none); BOOST_TEST_EQ_MSG(act, exec_action_type::setup_cancellation, tc.name); act = fsm.resume(true, cancellation_type_t::none); BOOST_TEST_EQ_MSG(act, exec_action_type::notify_writer, tc.name); @@ -338,30 +308,67 @@ void test_cancel_notwaiting_notterminal() act = fsm.resume(true, cancellation_type_t::none); BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name); - // Simulate a successful write + // The multiplexer starts writing the request BOOST_TEST_EQ_MSG(mpx.prepare_write(), 1u, tc.name); - BOOST_TEST_EQ_MSG(mpx.commit_write(), 0u, tc.name); // all requests expect a response + BOOST_TEST_EQ_MSG(mpx.commit_write(), 0u, tc.name); - // We got requested a cancellation here, but we can't honor it + // A cancellation arrives act = fsm.resume(true, tc.type); - BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name); + BOOST_TEST_EQ(act, exec_action(asio::error::operation_aborted)); + input.reset(); // Verify we don't access the request or response after completion - // Simulate a successful read - auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); + // When the response to this request arrives, it gets ignored + error_code ec; + auto res = mpx.consume_next("-ERR wrong command\r\n", ec); BOOST_TEST_EQ_MSG(ec, error_code(), tc.name); - BOOST_TEST_EQ_MSG(req_status.first, consume_result::got_response, tc.name); - BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed - BOOST_TEST_EQ_MSG(input.done_calls, 1u, tc.name); + BOOST_TEST_EQ_MSG(res.first, consume_result::got_response, tc.name); - // This will awaken the exec operation, and should complete the operation - act = fsm.resume(true, cancellation_type_t::none); - BOOST_TEST_EQ_MSG(act, exec_action(error_code(), 11u), tc.name); - - // All memory should have been freed by now - BOOST_TEST_EQ_MSG(input.weak_elm.expired(), true, tc.name); + // The multiplexer::elem object needs to survive here to mark the + // request as abandoned } } +// If the request is being processed and total cancellation is requested, we ignore the cancellation +void test_cancel_notwaiting_total() +{ + // Setup + multiplexer mpx; + elem_and_request input; + exec_fsm fsm(mpx, std::move(input.elm)); + error_code ec; + + // Initiate + auto act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::setup_cancellation); + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::notify_writer); + + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action_type::wait_for_response); + + // Simulate a successful write + BOOST_TEST_EQ(mpx.prepare_write(), 1u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response + + // We got requested a cancellation here, but we can't honor it + act = fsm.resume(true, asio::cancellation_type_t::total); + BOOST_TEST_EQ(act, exec_action_type::wait_for_response); + + // Simulate a successful read + auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_EQ(req_status.first, consume_result::got_response); + BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed + BOOST_TEST_EQ(input.done_calls, 1u); + + // This will awaken the exec operation, and should complete the operation + act = fsm.resume(true, cancellation_type_t::none); + BOOST_TEST_EQ(act, exec_action(error_code(), 11u)); + + // All memory should have been freed by now + BOOST_TEST_EQ(input.weak_elm.expired(), true); +} + } // namespace int main() @@ -371,8 +378,8 @@ int main() test_cancel_if_not_connected(); test_not_connected(); test_cancel_waiting(); - test_cancel_notwaiting_terminal(); - test_cancel_notwaiting_notterminal(); + test_cancel_notwaiting_terminal_partial(); + test_cancel_notwaiting_total(); return boost::report_errors(); } diff --git a/test/test_multiplexer.cpp b/test/test_multiplexer.cpp index fd8d3aa2..b7d26fd8 100644 --- a/test/test_multiplexer.cpp +++ b/test/test_multiplexer.cpp @@ -12,10 +12,10 @@ #include #include +#include #include #include -#include #include #include #include @@ -65,20 +65,48 @@ struct test_item { std::shared_ptr elem_ptr; bool done = false; - test_item(bool cmd_with_response = true) + static request make_request(bool cmd_with_response = true) { + request ret; + // The exact command is irrelevant because it is not being sent // to Redis. - req.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg"); + ret.push(cmd_with_response ? "PING" : "SUBSCRIBE", "cmd-arg"); + return ret; + } + + explicit test_item(request request_value) + : req{std::move(request_value)} + { elem_ptr = std::make_shared(req, any_adapter{resp}); elem_ptr->set_done_callback([this]() { done = true; }); } + + test_item(bool cmd_with_response = true) + : test_item(make_request(cmd_with_response)) + { } }; +void check_response( + const generic_response& actual, + boost::span expected, + boost::source_location loc = BOOST_CURRENT_LOCATION) +{ + if (!BOOST_TEST(actual.has_value())) { + std::cerr << "Response has error: " << actual.error().diagnostic << "\n" + << "Called from " << loc << std::endl; + return; + } + + if (!BOOST_TEST_ALL_EQ(actual->begin(), actual->end(), expected.begin(), expected.end())) { + std::cerr << "Called from " << loc << std::endl; + } +} + void test_request_needs_more() { // Setup @@ -100,16 +128,10 @@ void test_request_needs_more() // Parse the rest of it ret = mpx.consume_next("$11\r\nhello world\r\n", ec); BOOST_TEST_EQ(ret.first, consume_result::got_response); - BOOST_TEST(item1.resp.has_value()); - const node expected[] = { {type::blob_string, 1u, 0u, "hello world"}, }; - BOOST_TEST_ALL_EQ( - item1.resp->begin(), - item1.resp->end(), - std::begin(expected), - std::end(expected)); + check_response(item1.resp, expected); } void test_several_requests() @@ -239,14 +261,12 @@ void test_push() // Check BOOST_TEST_EQ(ret.first, consume_result::got_push); BOOST_TEST_EQ(ret.second, 16u); - BOOST_TEST(resp.has_value()); - const node expected[] = { {type::push, 2u, 0u, "" }, {type::simple_string, 1u, 1u, "one"}, {type::simple_string, 1u, 1u, "two"}, }; - BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); + check_response(resp, expected); } void test_push_needs_more() @@ -272,14 +292,12 @@ void test_push_needs_more() BOOST_TEST_EQ(ret.first, consume_result::got_push); BOOST_TEST_EQ(ret.second, 16u); - BOOST_TEST(resp.has_value()); - const node expected[] = { {type::push, 2u, 0u, "" }, {type::simple_string, 1u, 1u, "one"}, {type::simple_string, 1u, 1u, "two"}, }; - BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); + check_response(resp, expected); } // If a response is received and no request is waiting, it is interpreted @@ -298,12 +316,10 @@ void test_push_heuristics_no_request() // Check BOOST_TEST_EQ(ret.first, consume_result::got_push); BOOST_TEST_EQ(ret.second, 14u); - BOOST_TEST(resp.has_value()); - const node expected[] = { {type::simple_string, 1u, 0u, "Hello world"}, }; - BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); + check_response(resp, expected); } // Same, but there's another request that hasn't been written yet. @@ -326,12 +342,10 @@ void test_push_heuristics_request_waiting() // Check BOOST_TEST_EQ(ret.first, consume_result::got_push); BOOST_TEST_EQ(ret.second, 14u); - BOOST_TEST(resp.has_value()); - const node expected[] = { {type::simple_string, 1u, 0u, "Hello world"}, }; - BOOST_TEST_ALL_EQ(resp->begin(), resp->end(), std::begin(expected), std::end(expected)); + check_response(resp, expected); } // If a response is received and the first request doesn't expect a response, @@ -385,13 +399,12 @@ void test_mix_responses_pushes() auto ret = mpx.consume_next(push1_buffer, ec); BOOST_TEST_EQ(ret.first, consume_result::got_push); BOOST_TEST_EQ(ret.second, 16u); - BOOST_TEST(push_resp.has_value()); std::vector expected{ {type::push, 2u, 0u, "" }, {type::simple_string, 1u, 1u, "one"}, {type::simple_string, 1u, 1u, "two"}, }; - BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end()); + check_response(push_resp, expected); BOOST_TEST_NOT(item1.done); BOOST_TEST_NOT(item2.done); @@ -400,11 +413,10 @@ void test_mix_responses_pushes() ret = mpx.consume_next(response1_buffer, ec); BOOST_TEST_EQ(ret.first, consume_result::got_response); BOOST_TEST_EQ(ret.second, 18u); - BOOST_TEST(item1.resp.has_value()); expected = { {type::blob_string, 1u, 0u, "Hello world"}, }; - BOOST_TEST_ALL_EQ(item1.resp->begin(), item1.resp->end(), expected.begin(), expected.end()); + check_response(item1.resp, expected); BOOST_TEST(item1.done); BOOST_TEST_NOT(item2.done); @@ -413,7 +425,6 @@ void test_mix_responses_pushes() ret = mpx.consume_next(push2_buffer, ec); BOOST_TEST_EQ(ret.first, consume_result::got_push); BOOST_TEST_EQ(ret.second, 19u); - BOOST_TEST(push_resp.has_value()); expected = { {type::push, 2u, 0u, "" }, {type::simple_string, 1u, 1u, "one" }, @@ -422,7 +433,7 @@ void test_mix_responses_pushes() {type::simple_string, 1u, 1u, "other"}, {type::simple_string, 1u, 1u, "push" }, }; - BOOST_TEST_ALL_EQ(push_resp->begin(), push_resp->end(), expected.begin(), expected.end()); + check_response(push_resp, expected); BOOST_TEST(item1.done); BOOST_TEST_NOT(item2.done); @@ -431,11 +442,10 @@ void test_mix_responses_pushes() ret = mpx.consume_next(response2_buffer, ec); BOOST_TEST_EQ(ret.first, consume_result::got_response); BOOST_TEST_EQ(ret.second, 14u); - BOOST_TEST(item2.resp.has_value()); expected = { {type::blob_string, 1u, 0u, "Response"}, }; - BOOST_TEST_ALL_EQ(item2.resp->begin(), item2.resp->end(), expected.begin(), expected.end()); + check_response(item2.resp, expected); BOOST_TEST(item1.done); BOOST_TEST(item2.done); @@ -449,6 +459,244 @@ void test_mix_responses_pushes() BOOST_TEST_EQ(usg.push_bytes_received, push1_buffer.size() + push2_buffer.size()); } +// Cancellation cases +// If the request is waiting, we just remove it +void test_cancel_waiting() +{ + // Setup + multiplexer mpx; + auto item1 = std::make_unique(); + auto item2 = std::make_unique(); + mpx.add(item1->elem_ptr); + mpx.add(item2->elem_ptr); + + // Cancel the first request + mpx.cancel(item1->elem_ptr); + item1.reset(); // Verify we don't reference this item anyhow + + // We can progress the other request normally + BOOST_TEST_EQ(mpx.prepare_write(), 1u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + error_code ec; + auto res = mpx.consume_next("$11\r\nHello world\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST(item2->done); + const node expected[] = { + {type::blob_string, 1u, 0u, "Hello world"}, + }; + check_response(item2->resp, expected); +} + +// If the request is staged, we mark it as abandoned +void test_cancel_staged() +{ + // Setup + multiplexer mpx; + auto item1 = std::make_unique(); + auto item2 = std::make_unique(); + mpx.add(item1->elem_ptr); + mpx.add(item2->elem_ptr); + + // A write starts + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + + // Cancel the first request + mpx.cancel(item1->elem_ptr); + item1.reset(); // Verify we don't reference this item anyhow + + // The write gets confirmed + BOOST_TEST_EQ(mpx.commit_write(), 0u); + + // The cancelled request's response arrives. It gets discarded + error_code ec; + auto res = mpx.consume_next("+Goodbye\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_NOT(item2->done); + + // The 2nd request's response arrives. It gets parsed successfully + res = mpx.consume_next("$11\r\nHello world\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST(item2->done); + const node expected[] = { + {type::blob_string, 1u, 0u, "Hello world"}, + }; + check_response(item2->resp, expected); +} + +// If the request is staged but didn't expect a response, we remove it +void test_cancel_staged_command_without_response() +{ + // Setup + multiplexer mpx; + auto item1 = std::make_unique(false); + auto item2 = std::make_unique(); + mpx.add(item1->elem_ptr); + mpx.add(item2->elem_ptr); + + // A write starts + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + + // Cancel the first request + mpx.cancel(item1->elem_ptr); + item1.reset(); // Verify we don't reference this item anyhow + + // The write gets confirmed + BOOST_TEST_EQ(mpx.commit_write(), 1u); + + // The 2nd request's response arrives. It gets parsed successfully + error_code ec; + auto res = mpx.consume_next("$11\r\nHello world\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST(item2->done); + const node expected[] = { + {type::blob_string, 1u, 0u, "Hello world"}, + }; + check_response(item2->resp, expected); +} + +// If the request is written, we mark it as abandoned +void test_cancel_written() +{ + // Setup + multiplexer mpx; + auto item1 = std::make_unique(); + auto item2 = std::make_unique(); + mpx.add(item1->elem_ptr); + mpx.add(item2->elem_ptr); + + // A write succeeds + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + + // Cancel the first request + mpx.cancel(item1->elem_ptr); + item1.reset(); // Verify we don't reference this item anyhow + + // The cancelled request's response arrives. It gets discarded + error_code ec; + auto res = mpx.consume_next("+Goodbye\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_NOT(item2->done); + + // The 2nd request's response arrives. It gets parsed successfully + res = mpx.consume_next("$11\r\nHello world\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST(item2->done); + const node expected[] = { + {type::blob_string, 1u, 0u, "Hello world"}, + }; + check_response(item2->resp, expected); +} + +// Having a written request for which part of its response +// has been received doesn't cause trouble +void test_cancel_written_half_parsed_response() +{ + // Setup + request req; + req.push("PING", "value1"); + req.push("PING", "value2"); + req.push("PING", "value3"); + multiplexer mpx; + auto item1 = std::make_unique(std::move(req)); + auto item2 = std::make_unique(); + mpx.add(item1->elem_ptr); + mpx.add(item2->elem_ptr); + + // A write succeeds + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + + // Get the response for the 1st command in req1 + error_code ec; + auto res = mpx.consume_next("+Goodbye\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_NOT(item1->done); + BOOST_TEST_EQ(ec, error_code()); + + // Get a partial response for the 2nd command in req1 + res = mpx.consume_next("*2\r\n$4\r\nsome\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::needs_more); + BOOST_TEST_NOT(item1->done); + BOOST_TEST_EQ(ec, error_code()); + + // Cancel the first request + mpx.cancel(item1->elem_ptr); + item1.reset(); // Verify we don't reference this item anyhow + + // Get the rest of the response for the 2nd command in req1 + res = mpx.consume_next("*2\r\n$4\r\nsome\r\n$4\r\ndata\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_NOT(item2->done); + BOOST_TEST_EQ(ec, error_code()); + + // Get the response for the 3rd command in req1 + res = mpx.consume_next("+last\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_NOT(item2->done); + BOOST_TEST_EQ(ec, error_code()); + + // Get the response for the 2nd request + res = mpx.consume_next("$11\r\nHello world\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST(item2->done); + const node expected[] = { + {type::blob_string, 1u, 0u, "Hello world"}, + }; + check_response(item2->resp, expected); +} + +// If an abandoned request receives a NULL or an error, nothing happens +// (regression check) +void test_cancel_written_null_error() +{ + // Setup + request req; + req.push("PING", "value1"); + req.push("PING", "value2"); + req.push("PING", "value3"); + multiplexer mpx; + auto item1 = std::make_unique(std::move(req)); + auto item2 = std::make_unique(); + mpx.add(item1->elem_ptr); + mpx.add(item2->elem_ptr); + + // A write succeeds + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + + // Cancel the first request + mpx.cancel(item1->elem_ptr); + item1.reset(); // Verify we don't reference this item anyhow + + // The cancelled request's response arrives. It contains NULLs and errors. + // We ignore them + error_code ec; + auto res = mpx.consume_next("-ERR wrong command\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_NOT(item2->done); + + res = mpx.consume_next("!3\r\nBad\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_NOT(item2->done); + + res = mpx.consume_next("_\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST_EQ(ec, error_code()); + BOOST_TEST_NOT(item2->done); + + // The 2nd request's response arrives. It gets parsed successfully + res = mpx.consume_next("$11\r\nHello world\r\n", ec); + BOOST_TEST_EQ(res.first, consume_result::got_response); + BOOST_TEST(item2->done); + const node expected[] = { + {type::blob_string, 1u, 0u, "Hello world"}, + }; + check_response(item2->resp, expected); +} + // Cancellation on connection lost void test_cancel_on_connection_lost() { @@ -499,6 +747,57 @@ void test_cancel_on_connection_lost() BOOST_TEST(item_waiting2.done); } +// cancel_on_connection_lost cleans up any abandoned request, +// regardless of their configuration +void test_cancel_on_connection_lost_abandoned() +{ + // Setup + multiplexer mpx; + auto item_written1 = std::make_unique(); + auto item_written2 = std::make_unique(); + auto item_staged1 = std::make_unique(); + auto item_staged2 = std::make_unique(); + + // Different items have different configurations + // (note that these are all true by default) + item_written1->req.get_config().cancel_if_unresponded = false; + item_staged1->req.get_config().cancel_if_unresponded = false; + + // Make each item reach the state it should be in + mpx.add(item_written1->elem_ptr); + mpx.add(item_written2->elem_ptr); + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + BOOST_TEST_EQ(mpx.commit_write(), 0u); + + mpx.add(item_staged1->elem_ptr); + mpx.add(item_staged2->elem_ptr); + BOOST_TEST_EQ(mpx.prepare_write(), 2u); + + // Check that we got it right + BOOST_TEST(item_written1->elem_ptr->is_written()); + BOOST_TEST(item_written2->elem_ptr->is_written()); + BOOST_TEST(item_staged1->elem_ptr->is_staged()); + BOOST_TEST(item_staged2->elem_ptr->is_staged()); + + // Cancel all of the requests + mpx.cancel(item_written1->elem_ptr); + mpx.cancel(item_written2->elem_ptr); + mpx.cancel(item_staged1->elem_ptr); + mpx.cancel(item_staged2->elem_ptr); + item_written1.reset(); + item_written2.reset(); + item_staged1.reset(); + item_staged2.reset(); + + // Trigger a connection lost event + mpx.cancel_on_conn_lost(); + + // This should have removed all requests, regardless of their config. + // If we restore the connection and try a write, nothing gets written. + mpx.reset(); + BOOST_TEST_EQ(mpx.prepare_write(), 0u); +} + // The test below fails. Uncomment when this is fixed: // https://github.com/boostorg/redis/issues/307 // void test_cancel_on_connection_lost_half_parsed_response() @@ -533,17 +832,12 @@ void test_cancel_on_connection_lost() // BOOST_TEST_EQ(ec, error_code()); // // Check the response -// BOOST_TEST(item.resp.has_value()); // const node expected[] = { // {type::array, 2u, 0u, "" }, // {type::simple_string, 1u, 1u, "hello"}, // {type::simple_string, 1u, 1u, "world"}, // }; -// BOOST_TEST_ALL_EQ( -// item.resp->begin(), -// item.resp->end(), -// std::begin(expected), -// std::end(expected)); +// check_response(item.resp, expected); // } // Resetting works @@ -580,15 +874,10 @@ void test_reset() ret = mpx.consume_next(response_buffer, ec); BOOST_TEST_EQ(ret.first, consume_result::got_response); BOOST_TEST_EQ(ret.second, response_buffer.size()); - BOOST_TEST(item2.resp.has_value()); const node expected[] = { {type::blob_string, 1u, 0u, "Hello world"}, }; - BOOST_TEST_ALL_EQ( - item2.resp->begin(), - item2.resp->end(), - std::begin(expected), - std::end(expected)); + check_response(item2.resp, expected); BOOST_TEST(item2.done); } @@ -605,7 +894,14 @@ int main() test_push_heuristics_request_without_response(); test_push_heuristics_request_waiting(); test_mix_responses_pushes(); + test_cancel_waiting(); + test_cancel_staged(); + test_cancel_staged_command_without_response(); + test_cancel_written(); + test_cancel_written_half_parsed_response(); + test_cancel_written_null_error(); test_cancel_on_connection_lost(); + test_cancel_on_connection_lost_abandoned(); // test_cancel_on_connection_lost_half_parsed_response(); test_reset();