diff --git a/example/Jamfile.v2 b/example/Jamfile.v2 index e7492b8..87d67e4 100644 --- a/example/Jamfile.v2 +++ b/example/Jamfile.v2 @@ -81,3 +81,7 @@ exe circle #exe echosse # : echosse.cpp # ; + +exe wait_on_sender + : wait_on_sender.cpp + ; diff --git a/example/wait_on_sender.cpp b/example/wait_on_sender.cpp new file mode 100644 index 0000000..22826a4 --- /dev/null +++ b/example/wait_on_sender.cpp @@ -0,0 +1,386 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +/////////////////////////////////////////// +// Implementation of P2300 fiber_context integration +// +// stdexec::fiber_sender(scheduler auto s, invocable auto f) -> sender_of>; +// - Runs f() in a fiber that always resumes on the the specified scheduler. +// +// stdexec::fiber_wait(sender auto s) -> std::optional> +// - suspends the current fiber and launches the async operation, s. +// - resumes the current fiber when 's' completes and returns the result of 's' + +namespace stdexec { + +struct __fiber_op_base; + +inline thread_local __fiber_op_base* __current_fiber_op = nullptr; + +struct __fiber_op_base : __immovable { + // Function that should be called to schedule resumption of the + // child fiber. Only valid to call when the child fiber is currently + // suspended. + using __schedule_fn = void(__fiber_op_base*) noexcept; + __schedule_fn* __schedule_; + + // When the child fiber is suspended this holds the fiber_context + // of the child fiber. When the child fiber is running, this holds + // the fiber_context of the fiber that last resumed this fiber. + std::fiber_context __child_or_main_fiber_; + + // The __fiber_op_base associated with the main fiber_context when + // the child fiber is active. + __fiber_op_base* __main_fiber_op_ = nullptr; + + void __resume_child() noexcept { + std::fiber_context __child = std::move(__child_or_main_fiber_); + [[maybe_unused]] std::fiber_context __result = + std::move(__child).resume_with([this](std::fiber_context __main) noexcept -> std::fiber_context { + assert(__child_or_main_fiber_.empty()); + __child_or_main_fiber_ = std::move(__main); + + assert(__main_fiber_op_ == nullptr); + __main_fiber_op_ = std::exchange(__current_fiber_op, this); + + return std::fiber_context(); + }); + assert(__result.empty()); + } + + template + void __resume_main_with(Func&& func) noexcept { + std::fiber_context __main = std::move(__child_or_main_fiber_); + [[maybe_unused]] std::fiber_context __result = + std::move(__main).resume_with([&](std::fiber_context __child) noexcept -> std::fiber_context { + assert(__child_or_main_fiber_.empty()); + __child_or_main_fiber_ = std::move(__child); + + assert(__current_fiber_op == this); + __current_fiber_op = std::exchange(__main_fiber_op_, nullptr); + + static_assert(noexcept(std::invoke(static_cast(func)))); + std::invoke(static_cast(func)); + + return std::fiber_context(); + }); + assert(__result.empty()); + } +}; + +namespace __fiber_sender { + +template +struct __fiber_op_state { + struct __t : private __fiber_op_base { + using __result_type = std::invoke_result_t>; + public: + using __id = __fiber_op_state; + __t(_Func&& __func, _Scheduler __sched, _Receiver __r) + : __receiver_(std::move(__r)) + , __scheduler_(std::move(__sched)) + { + this->__schedule_ = [](__fiber_op_base* __base) noexcept { + static_cast<__t*>(__base)->__schedule_resume(); + }; + this->__child_or_main_fiber_ = + std::fiber_context([__func=std::forward<_Func>(__func), this](std::fiber_context) noexcept -> std::fiber_context { + if (__started_) { + constexpr bool __is_nothrow = std::is_nothrow_invocable_v>; + try { + __result_.template emplace<1>( + __conv{ + [&]() noexcept(__is_nothrow) -> __non_void_result_type { + if constexpr (std::is_void_v<__result_type>) { + std::invoke(std::move(__func)); + return {}; + } else { + return std::invoke(std::move(__func)); + } + }}); + } catch (...) { + __result_.template emplace<2>(std::current_exception()); + } + + __deliver_result(); + } + + __current_fiber_op = std::exchange(this->__main_fiber_op_, nullptr); + return std::move(this->__child_or_main_fiber_); + }); + } + + ~__t() { + if (!__started_) { + // Need to resume the child in order for it to run to completion + // and destroy itself. + this->__resume_child(); + } + assert(this->__child_or_main_fiber_.empty()); + assert(this->__main_fiber_op_ == nullptr); + } + + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__started_ = true; + __self.__schedule_resume(); + } + + private: + // Receiver passed to schedule() operation that resumes the fiber. + struct __schedule_receiver { + __t& __op_; + + friend void tag_invoke(set_value_t, __schedule_receiver&& __self) noexcept { + // Take a copy of '__op_' member, since call to 'reset()' will + // destroy __self. + auto& __op = __self.__op_; + __op.__schedule_op_.reset(); + __op.__resume_child(); + } + + template + [[noreturn]] friend void tag_invoke(set_error_t, __schedule_receiver&&, E&&) noexcept { + std::terminate(); + } + + [[noreturn]] friend void tag_invoke(set_stopped_t, __schedule_receiver&&) noexcept { + std::terminate(); + } + + // TODO: Should this return something that forwards queries to + // the fiber_op_state::receiver object? + friend empty_env tag_invoke(get_env_t, const __schedule_receiver&) noexcept { + return {}; + } + }; + + // Schedule a call to __resume_child() to occur on the associated scheduler's context. + // The child fiber must be suspended before calling this. + void __schedule_resume() noexcept { + stdexec::start( + __schedule_op_.emplace(__conv{[&]() { + return stdexec::connect( + stdexec::schedule(__scheduler_), + __schedule_receiver{*this}); + }})); + } + + // Call the fiber-op's receiver with the return value of 'func()' + // Must be called from the child fiber's context. + void __deliver_result() noexcept { + this->__resume_main_with([&]() noexcept { + // Take a copy of '*this' before resuming the child as resuming the child + // will potentially destroy the temporary lambda we are currently executing + // within. + auto& __self = *this; + + // Resume the child fiber to let it run to completion and destroy itself. + // This ensures that `child_or_main_fiber` is an empty fiber_context before + // we run the op-state destructor (which could technically be run at any + // time, from any thread after we invoke the completion handler). + __self.__resume_child(); + + // Then, once we are back on the main context, we can invoke the receiver + // with the result. + if (__self.__result_.index() == 2) { + stdexec::set_error(std::move(__self.__receiver_), std::get<2>(std::move(__self.__result_))); + } else { + if constexpr (std::is_void_v<__result_type>) { + stdexec::set_value(std::move(__self.__receiver_)); + } else { + stdexec::set_value(std::move(__self.__receiver_), std::get<1>(std::move(__self.__result_))); + } + } + }); + } + + struct __empty {}; + using __non_void_result_type = std::conditional_t, __empty, __result_type>; + + _Receiver __receiver_; + _Scheduler __scheduler_; + bool __started_ = false; + std::variant __result_; + + using __schedule_sender = decltype(stdexec::schedule(std::declval<_Scheduler&>())); + using __schedule_op_state = stdexec::connect_result_t<__schedule_sender, __schedule_receiver>; + std::optional<__schedule_op_state> __schedule_op_; + }; +}; + +template +struct __sender { + struct __t { + using __id = __sender; + + using is_sender = void; + + using completion_signatures = + std::conditional_t< + std::is_nothrow_invocable_v<_Func>, + completion_signatures)>, + completion_signatures), set_error_t(std::exception_ptr)>>; + + _Scheduler __scheduler_; + _Func __main_func_; + + template + requires std::move_constructible<_Func> + friend stdexec::__t<__fiber_op_state<_Func, _Scheduler, _Receiver>> tag_invoke(connect_t, __t&& __self, _Receiver __r) { + return {std::move(__self.__main_func_), std::move(__self.__scheduler_), std::move(__r)}; + } + + template + requires std::copy_constructible<_Func> + friend stdexec::__t<__fiber_op_state<_Func, _Scheduler, _Receiver>> tag_invoke(connect_t, const __t& __self, _Receiver __r) { + return {std::move(__self.__main_func_), std::move(__self.__scheduler_), std::move(__r)}; + } + }; +}; + +} // namespace __fiber_sender + +template +__t<__fiber_sender::__sender<_Scheduler, _Func>> let_fiber(_Scheduler __sched, _Func __func) { + return {std::move(__sched), std::move(__func)}; +} + +namespace __fiber_wait { + + template + struct __state { + using _Tuple = std::tuple<_Values...>; + std::variant __data_{}; + __fiber_op_base* __fiber_op_ = nullptr; + }; + + template + struct __receiver { + struct __t { + using __id = __receiver; + __state<_Values...>* __state_; + + void __finish() noexcept { + auto* __op = __state_->__fiber_op_; + __op->__schedule_(__op); + } + + template + void __set_error(_Error __err) noexcept { + if constexpr (__decays_to<_Error, std::exception_ptr>) + __state_->__data_.template emplace<2>((_Error&&) __err); + else if constexpr (__decays_to<_Error, std::error_code>) + __state_->__data_.template emplace<2>( + std::make_exception_ptr(std::system_error(__err))); + else + __state_->__data_.template emplace<2>(std::make_exception_ptr((_Error&&) __err)); + + __finish(); + } + + template + requires constructible_from, _As...> + friend void tag_invoke(stdexec::set_value_t, __t&& __rcvr, _As&&... __as) noexcept { + try { + __rcvr.__state_->__data_.template emplace<1>((_As&&) __as...); + __rcvr.__finish(); + } catch (...) { + __rcvr.__set_error(std::current_exception()); + } + } + + template + friend void tag_invoke(stdexec::set_error_t, __t&& __rcvr, _Error __err) noexcept { + __rcvr.__set_error((_Error&&) __err); + } + + friend void tag_invoke(stdexec::set_stopped_t __d, __t&& __rcvr) noexcept { + __rcvr.__state_->__data_.template emplace<3>(__d); + __rcvr.__finish(); + } + + friend empty_env tag_invoke(stdexec::get_env_t, const __t& __rcvr) noexcept { + return {}; + } + }; + }; + + struct fiber_wait_t { + template + using __receiver_t = __t<__sync_wait::__sync_wait_result_impl<_Sender, __q<__receiver>>>; + + template <__single_value_variant_sender _Sender> + requires sender_in<_Sender, empty_env> + && sender_to<_Sender, __receiver_t<_Sender>> + static auto __impl(_Sender&& __sndr) -> std::optional<__sync_wait::__sync_wait_result_t<_Sender>> { + auto* __fiber_op = __current_fiber_op; + if (__fiber_op == nullptr) { + std::terminate(); + } + + using state_t = __sync_wait::__sync_wait_result_impl<_Sender, __q<__state>>; + state_t __state{}; + __state.__fiber_op_ = __fiber_op; + + // Launch the sender with a continuation that will fill in a variant + // and notify a condition variable. + auto __op_state = connect((_Sender&&) __sndr, __receiver_t<_Sender>{&__state}); + + // Need to launch the sender only after suspending the current fiber. + __fiber_op->__resume_main_with([&]() noexcept { + start(__op_state); + }); + + if (__state.__data_.index() == 2) + std::rethrow_exception(std::move(std::get<2>(__state.__data_))); + + if (__state.__data_.index() == 3) + return std::nullopt; + + return std::move(std::get<1>(__state.__data_)); + } + + template <__single_value_variant_sender _Sender> + requires sender_in<_Sender, empty_env> + && sender_to<_Sender, __receiver_t<_Sender>> + __attribute__((always_inline)) auto operator()(_Sender&& __sndr) const -> std::optional<__sync_wait::__sync_wait_result_t<_Sender>> { + return __impl(static_cast<_Sender&&>(__sndr)); + } +}; + +} // namespace __fiber_wait + +using __fiber_wait::fiber_wait_t; +inline constexpr fiber_wait_t fiber_wait{}; + +} // namespace stdexec + +int fiber_main() noexcept { + auto [x] = *stdexec::fiber_wait(stdexec::just(123)); + return x; +} + +int main() { + using namespace stdexec; + std::optional> result = sync_wait( + let_value(get_scheduler(), [](auto sched) { + return let_fiber(sched, &fiber_main); + })); + return std::get<0>(result.value()); +}