2
0
mirror of https://github.com/boostorg/cobalt.git synced 2026-01-31 08:02:16 +00:00
Files
cobalt/src/channel.cpp
2025-08-11 16:03:57 +08:00

143 lines
3.3 KiB
C++

//
// Copyright (c) 2022 Klemens Morgenstern (klemens.morgenstern@gmx.net)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/cobalt/channel.hpp>
#include <boost/asio/defer.hpp>
namespace boost::cobalt
{
channel<void>::~channel()
{
while (!read_queue_.empty())
read_queue_.front().awaited_from.reset();
while (!write_queue_.empty())
write_queue_.front().awaited_from.reset();
}
void channel<void>::close()
{
is_closed_ = true;
while (!read_queue_.empty())
{
auto & op = read_queue_.front();
op.unlink();
op.cancelled = true;
op.cancel_slot.clear();
if (op.awaited_from)
asio::defer(executor_, std::move(op.awaited_from));
}
while (!write_queue_.empty())
{
auto & op = write_queue_.front();
op.unlink();
op.cancelled = true;
op.closed = true;
op.cancel_slot.clear();
if (op.awaited_from)
asio::defer(executor_, std::move(op.awaited_from));
}
}
system::result<void> channel<void>::read_op::await_resume(const struct as_result_tag &)
{
if (cancel_slot.is_connected())
cancel_slot.clear();
if (chn->is_closed_)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::broken_pipe, &loc};
}
if (cancelled)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::operation_aborted, &loc};
}
if (!direct)
chn->n_--;
if (!chn->write_queue_.empty())
{
auto &op = chn->write_queue_.front();
BOOST_ASSERT(chn->read_queue_.empty());
if (op.await_ready())
{
op.unlink();
if (!op.cancelled && !op.closed)
{
op.direct = true;
chn->n_++;
}
BOOST_ASSERT(op.awaited_from);
asio::post(chn->executor_, std::move(op.awaited_from));
}
}
return {system::in_place_value};
}
void channel<void>::read_op::await_resume()
{
await_resume(as_result_tag{}).value(loc);
}
std::tuple<system::error_code> channel<void>::read_op::await_resume(const struct as_tuple_tag & )
{
return await_resume(as_result_tag{}).error();
}
system::result<void> channel<void>::write_op::await_resume(const struct as_result_tag &)
{
if (cancel_slot.is_connected())
cancel_slot.clear();
if (closed)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::broken_pipe, &loc};
}
if (cancelled)
{
constexpr static boost::source_location loc{BOOST_CURRENT_LOCATION};
return {system::in_place_error, asio::error::operation_aborted, &loc};
}
if (!direct)
chn->n_++;
if (!chn->read_queue_.empty())
{
auto & op = chn->read_queue_.front();
BOOST_ASSERT(chn->write_queue_.empty());
if (op.await_ready())
{
op.unlink();
BOOST_ASSERT(op.awaited_from);
asio::post(chn->executor_, std::move(op.awaited_from));
}
}
return {system::in_place_value};
}
void channel<void>::write_op::await_resume()
{
await_resume(as_result_tag{}).value(loc);
}
std::tuple<system::error_code> channel<void>::write_op::await_resume(const struct as_tuple_tag & )
{
return await_resume(as_result_tag{}).error();
}
}