2
0
mirror of https://github.com/boostorg/cobalt.git synced 2026-01-19 04:02:16 +00:00

channel handles close & cancel can occur before await_suspend.

This commit is contained in:
Klemens Morgenstern
2025-06-27 14:12:24 +08:00
parent effdf1cba2
commit f24e917d9c
5 changed files with 196 additions and 9 deletions

View File

@@ -96,7 +96,7 @@ struct channel
}
struct cancel_impl;
bool await_ready() { return !chn->buffer_.empty(); }
bool await_ready() { return !chn->buffer_.empty() || chn->is_closed_; }
template<typename Promise>
BOOST_COBALT_MSVC_NOINLINE
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
@@ -140,7 +140,7 @@ struct channel
struct cancel_impl;
bool await_ready() { return !chn->buffer_.full(); }
bool await_ready() { return !chn->buffer_.full() || chn->is_closed_; }
template<typename Promise>
BOOST_COBALT_MSVC_NOINLINE
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
@@ -257,9 +257,12 @@ struct channel<void>
}
struct cancel_impl;
bool await_ready() { return (chn->n_ > 0); }
template<typename Promise>
bool await_ready()
{
return (chn->n_ > 0) || chn->is_closed_;
}
template<typename Promise>
BOOST_COBALT_MSVC_NOINLINE
std::coroutine_handle<void> await_suspend(std::coroutine_handle<Promise> h);
BOOST_COBALT_DECL void await_resume();
@@ -297,7 +300,7 @@ struct channel<void>
struct cancel_impl;
bool await_ready()
{
return chn->n_ < chn->limit_;
return chn->n_ < chn->limit_ || chn->is_closed_;
}
template<typename Promise>

View File

@@ -96,6 +96,9 @@ template<typename T>
template<typename Promise>
std::coroutine_handle<void> channel<T>::read_op::await_suspend(std::coroutine_handle<Promise> h)
{
if (cancelled)
return h; // already interrupted.
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
cancel_slot.emplace<cancel_impl>(this);
@@ -162,8 +165,17 @@ system::result<T> channel<T>::read_op::await_resume(const struct as_result_tag &
if (cancel_slot.is_connected())
cancel_slot.clear();
if (chn->is_closed_ && chn->buffer_.empty() && !direct)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::broken_pipe, &loc};
}
if (cancelled)
return {system::in_place_error, asio::error::operation_aborted};
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::operation_aborted, &loc};
}
T value = chn->buffer_.empty() ? std::move(*direct) : std::move(chn->buffer_.front());
if (!chn->buffer_.empty())
@@ -207,6 +219,10 @@ template<typename T>
template<typename Promise>
std::coroutine_handle<void> channel<T>::write_op::await_suspend(std::coroutine_handle<Promise> h)
{
if (cancelled)
return h; // already interrupted.
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
cancel_slot.emplace<cancel_impl>(this);
@@ -263,8 +279,19 @@ system::result<void> channel<T>::write_op::await_resume(const struct as_result_
{
if (cancel_slot.is_connected())
cancel_slot.clear();
if (chn->is_closed_)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::broken_pipe, &loc};
}
if (cancelled)
return {system::in_place_error, asio::error::operation_aborted};
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::operation_aborted, &loc};
}
if (!direct)
{
@@ -326,6 +353,10 @@ struct channel<void>::write_op::cancel_impl
template<typename Promise>
std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine_handle<Promise> h)
{
if (cancelled)
return h; // already interrupted.
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
cancel_slot.emplace<cancel_impl>(this);
@@ -362,6 +393,9 @@ std::coroutine_handle<void> channel<void>::read_op::await_suspend(std::coroutine
template<typename Promise>
std::coroutine_handle<void> channel<void>::write_op::await_suspend(std::coroutine_handle<Promise> h)
{
if (cancelled)
return h; // already interrupted.
if constexpr (requires (Promise p) {p.get_cancellation_slot();})
if ((cancel_slot = h.promise().get_cancellation_slot()).is_connected())
cancel_slot.emplace<cancel_impl>(this);

View File

@@ -49,8 +49,17 @@ system::result<void> channel<void>::read_op::await_resume(const struct as_resul
if (cancel_slot.is_connected())
cancel_slot.clear();
if (chn->is_closed_)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::broken_pipe, &loc};
}
if (cancelled)
return {system::in_place_error, asio::error::operation_aborted};
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::operation_aborted, &loc};
}
if (!direct)
chn->n_--;
@@ -83,8 +92,18 @@ system::result<void> channel<void>::write_op::await_resume(const struct as_resul
{
if (cancel_slot.is_connected())
cancel_slot.clear();
if (chn->is_closed_)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::broken_pipe, &loc};
}
if (cancelled)
return {system::in_place_error, asio::error::operation_aborted};
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::operation_aborted, &loc};
}
if (!direct)
chn->n_++;

View File

@@ -393,4 +393,92 @@ CO_TEST_CASE(interrupt_void_1)
BOOST_CHECK(rl == 1);
}
cobalt::promise<void> do_write(cobalt::channel<void> & c, int times = 1)
{
while (times --> 0)
co_await c.write();
};
CO_TEST_CASE(interrupt_0_void)
{
cobalt::channel<void> c{0};
auto w = do_write(c);
BOOST_CHECK(!w.ready());
auto [ec] = co_await cobalt::as_tuple(test_interrupt(c.read()));
BOOST_CHECK_MESSAGE(ec == asio::error::operation_aborted, ec.to_string());
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(!w.ready());
co_await c.read();
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(w.ready());
}
CO_TEST_CASE(interrupt_1_void)
{
cobalt::channel<void> c{1};
auto w = do_write(c, 2);
BOOST_CHECK(!w.ready());
auto [ec] = co_await cobalt::as_tuple(test_interrupt(c.read()));
BOOST_CHECK_MESSAGE(ec == asio::error::operation_aborted, ec.to_string());
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(!w.ready());
co_await c.read();
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(w.ready());
co_await c.read();
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(w.ready());
}
cobalt::promise<void> do_write(cobalt::channel<int> & c, int times = 1)
{
int i = 0;
while (times --> 0)
co_await c.write(i++);
};
CO_TEST_CASE(interrupt_0_int)
{
cobalt::channel<int> c{0};
auto w = do_write(c);
BOOST_CHECK(!w.ready());
auto [ec, i] = co_await cobalt::as_tuple(test_interrupt(c.read()));
BOOST_CHECK_MESSAGE(ec == asio::error::operation_aborted, ec.to_string());
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(!w.ready());
i = co_await c.read();
BOOST_CHECK_EQUAL(i, 0);
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(w.ready());
}
CO_TEST_CASE(interrupt_1_int)
{
cobalt::channel<int> c{1};
auto w = do_write(c, 2);
BOOST_CHECK(!w.ready());
auto [ec, i] = co_await cobalt::as_tuple(test_interrupt(c.read()));
BOOST_CHECK_MESSAGE(ec == asio::error::operation_aborted, ec.to_string());
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(!w.ready());
i = co_await c.read();
BOOST_CHECK_EQUAL(i, 0);
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(w.ready());
i = co_await c.read();
BOOST_CHECK_EQUAL(i, 1);
co_await asio::post(co_await this_coro::executor);
BOOST_CHECK(w.ready());
}
}

View File

@@ -173,4 +173,47 @@ struct posted_handle
}
};
template<boost::cobalt::awaitable_type Aw>
struct test_interrupt
{
Aw aw;
test_interrupt(Aw && aw) : aw(std::move(aw)) {}
bool await_ready()
{
auto res = aw.await_ready();
aw.interrupt_await();
return res;
}
template<typename T>
auto await_suspend(std::coroutine_handle<T> h)
{
using type = decltype(aw.await_suspend(h));
if constexpr (std::is_void_v<type>)
{
aw.await_suspend(h);
aw.interrupt_await();
}
else
{
auto r = aw.await_suspend(h);
aw.interrupt_await();
return r;
}
}
template<typename T>
auto await_resume(const T & tag)
{
return aw.await_resume(tag);
}
auto await_resume()
{
return aw.await_resume();
}
};
#endif //BOOST_COBALT_TEST2_HPP