2
0
mirror of https://github.com/boostorg/cobalt.git synced 2026-01-19 16:12:15 +00:00
Files
cobalt/example/python.cpp
2023-12-06 07:45:21 +08:00

368 lines
8.9 KiB
C++

// Copyright (c) 2023 Klemens D. Morgenstern
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <nanobind/nanobind.h>
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/cobalt.hpp>
#include <thread>
/** In this example we'll connect cobalt and
* pythons asyncio using nanobind (a C++17 successor to pybind11).
*
*/
using namespace boost;
namespace py = nanobind;
// Small helper function to get the current event loop for python
py::object get_loop()
{
auto mod = py::module_::import_("asyncio");
auto getter = mod.attr("get_event_loop");
return getter();
}
// An asio::any_io_executor compatible wrapper around the event loop.
// all the query functions are for that compatibility.
struct python_executor
{
python_executor(py::object loop = ::get_loop()) noexcept
: m_ptr(loop.ptr())
{
// the asio::execution_context needs to present, we put it into the event loop.
if (!py::hasattr(loop, "__asio_execution_context"))
{
auto ptr = std::make_unique<asio::execution_context>();
context_ = ptr.get();
py::setattr(loop, "__asio_execution_context",
py::cast(std::move(ptr)));
}
else
context_ = py::cast<std::unique_ptr<asio::execution_context>&>(
py::getattr(loop, "__asio_execution_context")
).get();
}
asio::execution_context &query(asio::execution::context_t) const
{
return *context_;
}
static constexpr asio::execution::blocking_t
query(asio::execution::blocking_t) noexcept
{
return asio::execution::blocking.never;
}
// this function takes the function F and runs it on the event loop.
template<class F>
void
execute(F f) const
{
py::gil_scoped_acquire lock;
py::handle loop(m_ptr);
struct wrapper // override the const.
{
mutable F impl;
void operator()( ) const
{
std::move(impl)();
}
};
loop.attr("call_soon_threadsafe")(py::cpp_function(wrapper{std::move(f)}));
}
bool
operator==(python_executor const &other) const noexcept
{
return m_ptr == other.m_ptr;
}
bool
operator!=(python_executor const &other) const noexcept
{
return !(*this == other);
}
private:
PyObject *m_ptr;
asio::execution_context *context_;
};
// helper function so we can capture the currently active C++ exception into a python object.
py::object translate_current_exception()
{
py::object locals = py::dict();
locals["rethrow_"] = py::cpp_function([]{throw;});
PyRun_String(R"(
try:
rethrow_();
result = None
except Exception as e:
result = e;
)", Py_single_input, PyEval_GetGlobals(), locals.ptr());
return locals["result"];
}
struct py_coroutine
{
struct promise_type
{
constexpr static std::suspend_always initial_suspend() noexcept { return {};}
std::suspend_never final_suspend() noexcept
{
if (!done_)
return_value(py::none());
return {};
}
void unhandled_exception()
{
done_ = true;
loop.attr("call_soon")(
future.attr("set_exception"), translate_current_exception());
}
// cast errors will be caught by unhandled_exception.
template<typename T>
void return_value(T && value)
{
done_ = true;
loop.attr("call_soon")(
future.attr("set_result"), py::cast(std::forward<T>(value)));
}
template<typename T>
std::suspend_always yield_value(T && value)
{
loop.attr("call_soon")(
future.attr("set_result"), py::cast(std::forward<T>(value)));
owner->handle_.reset(this);
return {};
}
py::object loop;
py::object future;
py_coroutine get_return_object()
{
return py_coroutine{this};
}
bool done_ = false;
using executor_type = asio::any_io_executor;
executor_type exec_;
const executor_type & get_executor() { return exec_;}
py_coroutine * owner;
};
void initiate(py::object loop, py::object future)
{
if (!handle_)
throw std::invalid_argument("Awaited invalid coroutine");
if (handle_->done_)
throw py::stop_iteration("coroutine completed");
handle_->loop = std::move(loop);
handle_->future = std::move(future);
handle_->exec_ = python_executor(handle_->loop);
handle_->owner = this;
if (!cobalt::this_thread::has_executor())
cobalt::this_thread::set_executor(handle_->exec_);
std::coroutine_handle<promise_type>::from_promise(*handle_.release()).resume();
}
bool done() const
{
return !handle_ || handle_->done_;
}
py_coroutine(const py_coroutine & ) = delete;
py_coroutine(py_coroutine && ) = default;
private:
explicit py_coroutine(promise_type * pro) : handle_(pro)
{
}
struct deleter
{
void operator()(promise_type * p)
{
std::coroutine_handle<promise_type>::from_promise(*p).destroy();
}
};
std::unique_ptr<promise_type, deleter> handle_;
};
struct py_awaitable
{
py::object target;
py_awaitable(py::object target) : target(std::move(target)) {}
py::object result;
py::object exception;
constexpr bool await_ready() noexcept {return false;}
template<typename T>
bool await_suspend(std::coroutine_handle<T> h)
{
asio::any_io_executor exec;
if constexpr (requires (T & p) {p.get_executor();})
exec = h.promise().get_executor();
else
exec = cobalt::this_thread::get_executor();
auto loop = get_loop();
auto task = getattr(loop, "create_task")(target);
struct wrapper
{
asio::any_io_executor exec;
mutable cobalt::unique_handle<void> awaiter;
py_awaitable * res;
void operator()(py::object t) const
{
res->extract_result(t);
asio::dispatch(exec, std::move(awaiter));
}
};
if (py::cast<bool>(getattr(task, "done")()))
{
extract_result(task);
return false;
}
getattr(task, "add_done_callback")(py::cpp_function(wrapper{
std::move(exec),
cobalt::unique_handle<void>{h.address()},
this
}));
if constexpr (requires (T & p) {p.get_cancellation_slot();})
h.promise().get_cancellation_slot().
assign([c = getattr(task, "cancel")](asio::cancellation_type ct) {
py::gil_scoped_acquire lock;
c();
});
return true;
}
void extract_result(py::object task)
{
exception = getattr(task, "exception")();
if (exception.is_none())
result = getattr(task, "result")();
}
py::object await_resume()
{
if (!exception.is_none())
{
py::object locals = py::dict();
PyRun_String(
R"(def __rethrow__(ex):
raise ex;
)",
Py_single_input, PyEval_GetGlobals(), locals.ptr());
locals["__rethrow__"](exception);
}
return std::move(result);
}
};
cobalt::generator<int> test_generator()
{
for (auto i = 1; i < 10; i++)
co_yield i;
co_return 10;
}
cobalt::promise<int> test_promise()
{
asio::steady_timer tim{co_await cobalt::this_coro::executor,
std::chrono::milliseconds(100)};
co_await tim.async_wait(cobalt::use_op);
co_return 42;
}
cobalt::promise<void> await_py_coroutine(py_awaitable aw)
{
auto res = co_await std::move(aw);
printf("Python coroutine gave %s\n", py::str(res).c_str());
}
NB_MODULE(boost_cobalt_example_python, m)
{
namespace execution = asio::execution;
m.def("__rethrow_exception", &std::rethrow_exception);
py::class_<std::unique_ptr<asio::execution_context>>(m, "__asio__execution_context");
// use some inlined python to get a future
py::object locals = py::dict();
// language=python
PyRun_String(
R"(def __await_impl(self):
import asyncio
lp = asyncio.get_event_loop()
ft = lp.create_future()
self.initiate(lp, ft)
res = yield from ft
return res)", Py_single_input, PyEval_GetGlobals(), locals.ptr());
py::object await_impl = locals["__await_impl"];
// language=python
PyRun_String(
R"(async def __aiter_impl(self):
while not self.done:
yield await self
)", Py_single_input, PyEval_GetGlobals(), locals.ptr());
py::object aiter_impl = locals["__aiter_impl"];
py::class_<py_coroutine> ct(m, "__cobalt_coroutine");
ct.def("initiate", &py_coroutine::initiate)
.def_prop_ro("done", &py_coroutine::done);
setattr(ct, "__await__", await_impl);
setattr(ct, "__aiter__", aiter_impl);
m.def("test_generator",
[]() -> py_coroutine
{
BOOST_COBALT_FOR(auto v, test_generator())
co_yield v;
co_return py::none();
});
m.def("test_promise",
[]() -> py_coroutine
{
co_return co_await test_promise();
});
m.def("test_py_promise",
[](py::object obj) -> py_coroutine
{
co_await await_py_coroutine(std::move(obj));
co_return py::none();
});
}