2
0
mirror of https://github.com/boostorg/asio.git synced 2026-02-23 01:52:09 +00:00

Support immediate completion with reactor-based sockets and descriptors.

This commit is contained in:
Christopher Kohlhoff
2023-03-01 23:07:02 +11:00
parent 1f2f4bc862
commit 6550b831d3
39 changed files with 1121 additions and 120 deletions

View File

@@ -26,6 +26,7 @@
#include <boost/asio/detail/handler_work.hpp>
#include <boost/asio/detail/memory.hpp>
#include <boost/asio/detail/reactor_op.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/detail/push_options.hpp>
@@ -86,6 +87,9 @@ class descriptor_read_op
: public descriptor_read_op_base<MutableBufferSequence>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(descriptor_read_op);
descriptor_read_op(const boost::system::error_code& success_ec,
@@ -136,6 +140,37 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
descriptor_read_op* o(static_cast<descriptor_read_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -86,6 +86,9 @@ class descriptor_write_op
: public descriptor_write_op_base<ConstBufferSequence>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(descriptor_write_op);
descriptor_write_op(const boost::system::error_code& success_ec,
@@ -136,6 +139,37 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
descriptor_write_op* o(static_cast<descriptor_write_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -87,13 +87,31 @@ public:
per_descriptor_data& source_descriptor_data);
// Post a reactor operation for immediate completion.
void post_immediate_completion(reactor_op* op, bool is_continuation);
void post_immediate_completion(operation* op, bool is_continuation) const;
// Post a reactor operation for immediate completion.
BOOST_ASIO_DECL static void call_post_immediate_completion(
operation* op, bool is_continuation, const void* self);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
BOOST_ASIO_DECL void start_op(int op_type, socket_type descriptor,
per_descriptor_data&, reactor_op* op,
bool is_continuation, bool allow_speculative);
bool is_continuation, bool allow_speculative,
void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
void start_op(int op_type, socket_type descriptor,
per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
{
start_op(op_type, descriptor, descriptor_data,
op, is_continuation, allow_speculative,
&epoll_reactor::call_post_immediate_completion, this);
}
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the

View File

@@ -117,13 +117,30 @@ public:
per_descriptor_data& source_descriptor_data);
// Post a reactor operation for immediate completion.
void post_immediate_completion(operation* op, bool is_continuation);
void post_immediate_completion(operation* op, bool is_continuation) const;
// Post a reactor operation for immediate completion.
BOOST_ASIO_DECL static void call_post_immediate_completion(
operation* op, bool is_continuation, const void* self);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
BOOST_ASIO_DECL void start_op(int op_type, socket_type descriptor,
per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative);
bool is_continuation, bool allow_speculative,
void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
void start_op(int op_type, socket_type descriptor,
per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
{
start_op(op_type, descriptor, descriptor_data,
op, is_continuation, allow_speculative,
&epoll_reactor::call_post_immediate_completion, this);
}
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the

View File

@@ -16,9 +16,13 @@
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#include <boost/asio/detail/config.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/associated_immediate_executor.hpp>
#include <boost/asio/detail/handler_invoke_helpers.hpp>
#include <boost/asio/detail/initiate_dispatch.hpp>
#include <boost/asio/detail/type_traits.hpp>
#include <boost/asio/detail/work_dispatcher.hpp>
#include <boost/asio/execution/allocator.hpp>
#include <boost/asio/execution/blocking.hpp>
#include <boost/asio/execution/execute.hpp>
@@ -526,6 +530,34 @@ public:
}
};
template <typename Handler, typename IoExecutor>
class immediate_handler_work
{
public:
typedef handler_work<Handler, IoExecutor> handler_work_type;
explicit immediate_handler_work(BOOST_ASIO_MOVE_ARG(handler_work_type) w)
: handler_work_(BOOST_ASIO_MOVE_CAST(handler_work_type)(w))
{
}
template <typename Function>
void complete(Function& function, Handler& handler, const void* io_ex)
{
typedef typename associated_immediate_executor<Handler, IoExecutor>::type
immediate_ex_type;
immediate_ex_type immediate_ex = (get_associated_immediate_executor)(
handler, *static_cast<const IoExecutor*>(io_ex));
(initiate_dispatch_with_executor<immediate_ex_type>(immediate_ex))(
BOOST_ASIO_MOVE_CAST(Function)(function));
}
private:
handler_work_type handler_work_;
};
} // namespace detail
} // namespace asio
} // namespace boost

View File

@@ -28,7 +28,7 @@ namespace asio {
namespace detail {
inline void dev_poll_reactor::post_immediate_completion(
reactor_op* op, bool is_continuation)
operation* op, bool is_continuation) const
{
scheduler_.post_immediate_completion(op, is_continuation);
}

View File

@@ -149,15 +149,24 @@ void dev_poll_reactor::move_descriptor(socket_type,
{
}
void dev_poll_reactor::call_post_immediate_completion(
operation* op, bool is_continuation, const void* self)
{
static_cast<const dev_poll_reactor*>(self)->post_immediate_completion(
op, is_continuation);
}
void dev_poll_reactor::start_op(int op_type, socket_type descriptor,
dev_poll_reactor::per_descriptor_data&, reactor_op* op,
bool is_continuation, bool allow_speculative)
bool is_continuation, bool allow_speculative,
void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (shutdown_)
{
post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
@@ -170,7 +179,7 @@ void dev_poll_reactor::start_op(int op_type, socket_type descriptor,
if (op->perform())
{
lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
}

View File

@@ -26,7 +26,7 @@ namespace asio {
namespace detail {
inline void epoll_reactor::post_immediate_completion(
operation* op, bool is_continuation)
operation* op, bool is_continuation) const
{
scheduler_.post_immediate_completion(op, is_continuation);
}

View File

@@ -230,14 +230,23 @@ void epoll_reactor::move_descriptor(socket_type,
source_descriptor_data = 0;
}
void epoll_reactor::call_post_immediate_completion(
operation* op, bool is_continuation, const void* self)
{
static_cast<const epoll_reactor*>(self)->post_immediate_completion(
op, is_continuation);
}
void epoll_reactor::start_op(int op_type, socket_type descriptor,
epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
bool is_continuation, bool allow_speculative,
void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg)
{
if (!descriptor_data)
{
op->ec_ = boost::asio::error::bad_descriptor;
post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
@@ -245,7 +254,7 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
if (descriptor_data->shutdown_)
{
post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
@@ -263,7 +272,7 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
if (descriptor_data->registered_events_ != 0)
descriptor_data->try_speculative_[op_type] = false;
descriptor_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
}
@@ -271,7 +280,7 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
if (descriptor_data->registered_events_ == 0)
{
op->ec_ = boost::asio::error::operation_not_supported;
scheduler_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
@@ -290,7 +299,7 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
{
op->ec_ = boost::system::error_code(errno,
boost::asio::error::get_system_category());
scheduler_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
}
@@ -299,7 +308,7 @@ void epoll_reactor::start_op(int op_type, socket_type descriptor,
else if (descriptor_data->registered_events_ == 0)
{
op->ec_ = boost::asio::error::operation_not_supported;
scheduler_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
else

View File

@@ -29,7 +29,7 @@ namespace asio {
namespace detail {
inline void kqueue_reactor::post_immediate_completion(
operation* op, bool is_continuation)
operation* op, bool is_continuation) const
{
scheduler_.post_immediate_completion(op, is_continuation);
}

View File

@@ -191,14 +191,23 @@ void kqueue_reactor::move_descriptor(socket_type,
source_descriptor_data = 0;
}
void kqueue_reactor::call_post_immediate_completion(
operation* op, bool is_continuation, const void* self)
{
static_cast<const kqueue_reactor*>(self)->post_immediate_completion(
op, is_continuation);
}
void kqueue_reactor::start_op(int op_type, socket_type descriptor,
kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
bool is_continuation, bool allow_speculative,
void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg)
{
if (!descriptor_data)
{
op->ec_ = boost::asio::error::bad_descriptor;
post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
@@ -206,7 +215,7 @@ void kqueue_reactor::start_op(int op_type, socket_type descriptor,
if (descriptor_data->shutdown_)
{
post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
@@ -221,7 +230,7 @@ void kqueue_reactor::start_op(int op_type, socket_type descriptor,
if (op->perform())
{
descriptor_lock.unlock();
scheduler_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
@@ -240,7 +249,7 @@ void kqueue_reactor::start_op(int op_type, socket_type descriptor,
{
op->ec_ = boost::system::error_code(errno,
boost::asio::error::get_system_category());
scheduler_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}
}

View File

@@ -198,10 +198,10 @@ boost::system::error_code reactive_descriptor_service::cancel(
return ec;
}
void reactive_descriptor_service::start_op(
reactive_descriptor_service::implementation_type& impl,
int op_type, reactor_op* op, bool is_continuation,
bool is_non_blocking, bool noop)
void reactive_descriptor_service::do_start_op(implementation_type& impl,
int op_type, reactor_op* op, bool is_continuation, bool is_non_blocking,
bool noop, void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg)
{
if (!noop)
{
@@ -209,13 +209,13 @@ void reactive_descriptor_service::start_op(
descriptor_ops::set_internal_non_blocking(
impl.descriptor_, impl.state_, true, op->ec_))
{
reactor_.start_op(op_type, impl.descriptor_,
impl.reactor_data_, op, is_continuation, is_non_blocking);
reactor_.start_op(op_type, impl.descriptor_, impl.reactor_data_, op,
is_continuation, is_non_blocking, on_immediate, immediate_arg);
return;
}
}
reactor_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
}
} // namespace detail

View File

@@ -234,10 +234,11 @@ boost::system::error_code reactive_socket_service_base::do_assign(
return ec;
}
void reactive_socket_service_base::start_op(
reactive_socket_service_base::base_implementation_type& impl,
int op_type, reactor_op* op, bool is_continuation,
bool is_non_blocking, bool noop)
void reactive_socket_service_base::do_start_op(
reactive_socket_service_base::base_implementation_type& impl, int op_type,
reactor_op* op, bool is_continuation, bool is_non_blocking, bool noop,
void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg)
{
if (!noop)
{
@@ -245,31 +246,38 @@ void reactive_socket_service_base::start_op(
|| socket_ops::set_internal_non_blocking(
impl.socket_, impl.state_, true, op->ec_))
{
reactor_.start_op(op_type, impl.socket_,
impl.reactor_data_, op, is_continuation, is_non_blocking);
reactor_.start_op(op_type, impl.socket_, impl.reactor_data_, op,
is_continuation, is_non_blocking, on_immediate, immediate_arg);
return;
}
}
reactor_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
}
void reactive_socket_service_base::start_accept_op(
void reactive_socket_service_base::do_start_accept_op(
reactive_socket_service_base::base_implementation_type& impl,
reactor_op* op, bool is_continuation, bool peer_is_open)
reactor_op* op, bool is_continuation, bool peer_is_open,
void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg)
{
if (!peer_is_open)
start_op(impl, reactor::read_op, op, is_continuation, true, false);
{
do_start_op(impl, reactor::read_op, op, is_continuation,
true, false, on_immediate, immediate_arg);
}
else
{
op->ec_ = boost::asio::error::already_open;
reactor_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
}
}
void reactive_socket_service_base::start_connect_op(
void reactive_socket_service_base::do_start_connect_op(
reactive_socket_service_base::base_implementation_type& impl,
reactor_op* op, bool is_continuation, const void* addr, size_t addrlen)
reactor_op* op, bool is_continuation, const void* addr, size_t addrlen,
void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg)
{
if ((impl.state_ & socket_ops::non_blocking)
|| socket_ops::set_internal_non_blocking(
@@ -281,14 +289,14 @@ void reactive_socket_service_base::start_connect_op(
|| op->ec_ == boost::asio::error::would_block)
{
op->ec_ = boost::system::error_code();
reactor_.start_op(reactor::connect_op, impl.socket_,
impl.reactor_data_, op, is_continuation, false);
reactor_.start_op(reactor::connect_op, impl.socket_, impl.reactor_data_,
op, is_continuation, false, on_immediate, immediate_arg);
return;
}
}
}
reactor_.post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
}
} // namespace detail

View File

@@ -36,7 +36,7 @@ namespace asio {
namespace detail {
inline void select_reactor::post_immediate_completion(
reactor_op* op, bool is_continuation)
operation* op, bool is_continuation) const
{
scheduler_.post_immediate_completion(op, is_continuation);
}

View File

@@ -152,15 +152,23 @@ void select_reactor::move_descriptor(socket_type,
{
}
void select_reactor::call_post_immediate_completion(
operation* op, bool is_continuation, const void* self)
{
static_cast<const select_reactor*>(self)->post_immediate_completion(
op, is_continuation);
}
void select_reactor::start_op(int op_type, socket_type descriptor,
select_reactor::per_descriptor_data&, reactor_op* op,
bool is_continuation, bool)
select_reactor::per_descriptor_data&, reactor_op* op, bool is_continuation,
bool, void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (shutdown_)
{
post_immediate_completion(op, is_continuation);
on_immediate(op, is_continuation, immediate_arg);
return;
}

View File

@@ -117,13 +117,30 @@ public:
per_descriptor_data& source_descriptor_data);
// Post a reactor operation for immediate completion.
void post_immediate_completion(operation* op, bool is_continuation);
void post_immediate_completion(operation* op, bool is_continuation) const;
// Post a reactor operation for immediate completion.
BOOST_ASIO_DECL static void call_post_immediate_completion(
operation* op, bool is_continuation, const void* self);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
BOOST_ASIO_DECL void start_op(int op_type, socket_type descriptor,
per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative);
bool is_continuation, bool allow_speculative,
void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
void start_op(int op_type, socket_type descriptor,
per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
{
start_op(op_type, descriptor, descriptor_data,
op, is_continuation, allow_speculative,
&kqueue_reactor::call_post_immediate_completion, this);
}
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the

View File

@@ -23,6 +23,7 @@
&& !defined(BOOST_ASIO_HAS_IO_URING_AS_DEFAULT)
#include <boost/asio/associated_cancellation_slot.hpp>
#include <boost/asio/associated_immediate_executor.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/execution_context.hpp>
@@ -228,19 +229,20 @@ public:
switch (w)
{
case posix::descriptor_base::wait_read:
op_type = reactor::read_op;
break;
op_type = reactor::read_op;
break;
case posix::descriptor_base::wait_write:
op_type = reactor::write_op;
break;
op_type = reactor::write_op;
break;
case posix::descriptor_base::wait_error:
op_type = reactor::except_op;
break;
default:
p.p->ec_ = boost::asio::error::invalid_argument;
reactor_.post_immediate_completion(p.p, is_continuation);
p.v = p.p = 0;
return;
op_type = reactor::except_op;
break;
default:
p.p->ec_ = boost::asio::error::invalid_argument;
start_op(impl, reactor::read_op, p.p,
is_continuation, false, true, &io_ex, 0);
p.v = p.p = 0;
return;
}
// Optionally register for per-operation cancellation.
@@ -251,7 +253,7 @@ public:
&reactor_, &impl.reactor_data_, impl.descriptor_, op_type);
}
start_op(impl, op_type, p.p, is_continuation, false, false);
start_op(impl, op_type, p.p, is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -325,7 +327,7 @@ public:
start_op(impl, reactor::write_op, p.p, is_continuation, true,
buffer_sequence_adapter<boost::asio::const_buffer,
ConstBufferSequence>::all_empty(buffers));
ConstBufferSequence>::all_empty(buffers), &io_ex, 0);
p.v = p.p = 0;
}
@@ -358,7 +360,8 @@ public:
BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "descriptor",
&impl, impl.descriptor_, "async_write_some(null_buffers)"));
start_op(impl, reactor::write_op, p.p, is_continuation, false, false);
start_op(impl, reactor::write_op, p.p,
is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -433,7 +436,7 @@ public:
start_op(impl, reactor::read_op, p.p, is_continuation, true,
buffer_sequence_adapter<boost::asio::mutable_buffer,
MutableBufferSequence>::all_empty(buffers));
MutableBufferSequence>::all_empty(buffers), &io_ex, 0);
p.v = p.p = 0;
}
@@ -466,14 +469,47 @@ public:
BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "descriptor",
&impl, impl.descriptor_, "async_read_some(null_buffers)"));
start_op(impl, reactor::read_op, p.p, is_continuation, false, false);
start_op(impl, reactor::read_op, p.p,
is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
private:
// Start the asynchronous operation.
BOOST_ASIO_DECL void start_op(implementation_type& impl, int op_type,
reactor_op* op, bool is_continuation, bool is_non_blocking, bool noop);
BOOST_ASIO_DECL void do_start_op(implementation_type& impl, int op_type,
reactor_op* op, bool is_continuation, bool is_non_blocking, bool noop,
void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg);
// Start the asynchronous operation for handlers that are specialised for
// immediate completion.
template <typename Op>
void start_op(implementation_type& impl, int op_type, Op* op,
bool is_continuation, bool is_non_blocking, bool noop,
const void* io_ex, ...)
{
return do_start_op(impl, op_type, op, is_continuation,
is_non_blocking, noop, &Op::do_immediate, io_ex);
}
// Start the asynchronous operation for handlers that are not specialised for
// immediate completion.
template <typename Op>
void start_op(implementation_type& impl, int op_type, Op* op,
bool is_continuation, bool is_non_blocking, bool noop, const void*,
typename enable_if<
is_same<
typename associated_immediate_executor<
typename Op::handler_type,
typename Op::io_executor_type
>::asio_associated_immediate_executor_is_unspecialised,
void
>::value
>::type*)
{
return do_start_op(impl, op_type, op, is_continuation, is_non_blocking,
noop, &reactor::call_post_immediate_completion, &reactor_);
}
// Helper class used to implement per-operation cancellation
class reactor_op_cancellation

View File

@@ -34,6 +34,9 @@ template <typename Handler, typename IoExecutor>
class reactive_null_buffers_op : public reactor_op
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_null_buffers_op);
reactive_null_buffers_op(const boost::system::error_code& success_ec,
@@ -86,6 +89,35 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_null_buffers_op* o(static_cast<reactive_null_buffers_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -96,6 +96,9 @@ class reactive_socket_accept_op :
public reactive_socket_accept_op_base<Socket, Protocol>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_accept_op);
reactive_socket_accept_op(const boost::system::error_code& success_ec,
@@ -152,6 +155,40 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_accept_op* o(static_cast<reactive_socket_accept_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
// On success, assign new connection to peer socket object.
o->do_assign();
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder1<Handler, boost::system::error_code>
handler(o->handler_, o->ec_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;
@@ -168,6 +205,9 @@ class reactive_socket_move_accept_op :
Protocol>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_move_accept_op);
reactive_socket_move_accept_op(const boost::system::error_code& success_ec,
@@ -229,6 +269,43 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_move_accept_op* o(
static_cast<reactive_socket_move_accept_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
// On success, assign new connection to peer socket object.
o->do_assign();
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::move_binder2<Handler,
boost::system::error_code, peer_socket_type>
handler(0, BOOST_ASIO_MOVE_CAST(Handler)(o->handler_), o->ec_,
BOOST_ASIO_MOVE_CAST(peer_socket_type)(*o));
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, "..."));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
typedef typename Protocol::socket::template
rebind_executor<PeerIoExecutor>::other peer_socket_type;

View File

@@ -63,6 +63,9 @@ template <typename Handler, typename IoExecutor>
class reactive_socket_connect_op : public reactive_socket_connect_op_base
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_connect_op);
reactive_socket_connect_op(const boost::system::error_code& success_ec,
@@ -113,6 +116,38 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_connect_op* o
(static_cast<reactive_socket_connect_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder1<Handler, boost::system::error_code>
handler(o->handler_, o->ec_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -98,6 +98,9 @@ class reactive_socket_recv_op :
public reactive_socket_recv_op_base<MutableBufferSequence>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_recv_op);
reactive_socket_recv_op(const boost::system::error_code& success_ec,
@@ -149,6 +152,37 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_recv_op* o(static_cast<reactive_socket_recv_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -100,6 +100,9 @@ class reactive_socket_recvfrom_op :
public reactive_socket_recvfrom_op_base<MutableBufferSequence, Endpoint>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_recvfrom_op);
reactive_socket_recvfrom_op(const boost::system::error_code& success_ec,
@@ -154,6 +157,38 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_recvfrom_op* o(
static_cast<reactive_socket_recvfrom_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -81,6 +81,9 @@ class reactive_socket_recvmsg_op :
public reactive_socket_recvmsg_op_base<MutableBufferSequence>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_recvmsg_op);
reactive_socket_recvmsg_op(const boost::system::error_code& success_ec,
@@ -135,6 +138,38 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_recvmsg_op* o(
static_cast<reactive_socket_recvmsg_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -101,6 +101,9 @@ class reactive_socket_send_op :
public reactive_socket_send_op_base<ConstBufferSequence>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_send_op);
reactive_socket_send_op(const boost::system::error_code& success_ec,
@@ -152,6 +155,38 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_send_op* o(static_cast<reactive_socket_send_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -94,6 +94,9 @@ class reactive_socket_sendto_op :
public reactive_socket_sendto_op_base<ConstBufferSequence, Endpoint>
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_sendto_op);
reactive_socket_sendto_op(const boost::system::error_code& success_ec,
@@ -146,6 +149,37 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_socket_sendto_op* o(static_cast<reactive_socket_sendto_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
BOOST_ASIO_ERROR_LOCATION(o->ec_);
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder2<Handler, boost::system::error_code, std::size_t>
handler(o->handler_, o->ec_, o->bytes_transferred_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_, handler.arg2_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -307,7 +307,8 @@ public:
BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_send_to"));
start_op(impl, reactor::write_op, p.p, is_continuation, true, false);
start_op(impl, reactor::write_op, p.p,
is_continuation, true, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -340,7 +341,8 @@ public:
BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_send_to(null_buffers)"));
start_op(impl, reactor::write_op, p.p, is_continuation, false, false);
start_op(impl, reactor::write_op, p.p,
is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -431,7 +433,7 @@ public:
start_op(impl,
(flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation, true, false);
p.p, is_continuation, true, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -470,7 +472,7 @@ public:
start_op(impl,
(flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation, false, false);
p.p, is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -536,7 +538,7 @@ public:
BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_accept"));
start_accept_op(impl, p.p, is_continuation, peer.is_open());
start_accept_op(impl, p.p, is_continuation, peer.is_open(), &io_ex, 0);
p.v = p.p = 0;
}
@@ -573,7 +575,7 @@ public:
BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_accept"));
start_accept_op(impl, p.p, is_continuation, false);
start_accept_op(impl, p.p, is_continuation, false, &io_ex, 0);
p.v = p.p = 0;
}
#endif // defined(BOOST_ASIO_HAS_MOVE)
@@ -618,7 +620,7 @@ public:
&impl, impl.socket_, "async_connect"));
start_connect_op(impl, p.p, is_continuation,
peer_endpoint.data(), peer_endpoint.size());
peer_endpoint.data(), peer_endpoint.size(), &io_ex, 0);
p.v = p.p = 0;
}
};

View File

@@ -218,20 +218,21 @@ public:
int op_type;
switch (w)
{
case socket_base::wait_read:
op_type = reactor::read_op;
break;
case socket_base::wait_write:
op_type = reactor::write_op;
break;
case socket_base::wait_error:
op_type = reactor::except_op;
break;
default:
p.p->ec_ = boost::asio::error::invalid_argument;
reactor_.post_immediate_completion(p.p, is_continuation);
p.v = p.p = 0;
return;
case socket_base::wait_read:
op_type = reactor::read_op;
break;
case socket_base::wait_write:
op_type = reactor::write_op;
break;
case socket_base::wait_error:
op_type = reactor::except_op;
break;
default:
p.p->ec_ = boost::asio::error::invalid_argument;
start_op(impl, reactor::read_op, p.p,
is_continuation, false, true, &io_ex, 0);
p.v = p.p = 0;
return;
}
// Optionally register for per-operation cancellation.
@@ -242,7 +243,7 @@ public:
&reactor_, &impl.reactor_data_, impl.socket_, op_type);
}
start_op(impl, op_type, p.p, is_continuation, false, false);
start_op(impl, op_type, p.p, is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -314,7 +315,7 @@ public:
start_op(impl, reactor::write_op, p.p, is_continuation, true,
((impl.state_ & socket_ops::stream_oriented)
&& buffer_sequence_adapter<boost::asio::const_buffer,
ConstBufferSequence>::all_empty(buffers)));
ConstBufferSequence>::all_empty(buffers)), &io_ex, 0);
p.v = p.p = 0;
}
@@ -346,7 +347,8 @@ public:
BOOST_ASIO_HANDLER_CREATION((reactor_.context(), *p.p, "socket",
&impl, impl.socket_, "async_send(null_buffers)"));
start_op(impl, reactor::write_op, p.p, is_continuation, false, false);
start_op(impl, reactor::write_op, p.p,
is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -423,7 +425,7 @@ public:
(flags & socket_base::message_out_of_band) == 0,
((impl.state_ & socket_ops::stream_oriented)
&& buffer_sequence_adapter<boost::asio::mutable_buffer,
MutableBufferSequence>::all_empty(buffers)));
MutableBufferSequence>::all_empty(buffers)), &io_ex, 0);
p.v = p.p = 0;
}
@@ -459,7 +461,7 @@ public:
start_op(impl,
(flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation, false, false);
p.p, is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -531,7 +533,7 @@ public:
(in_flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation,
(in_flags & socket_base::message_out_of_band) == 0, false);
(in_flags & socket_base::message_out_of_band) == 0, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -572,7 +574,7 @@ public:
start_op(impl,
(in_flags & socket_base::message_out_of_band)
? reactor::except_op : reactor::read_op,
p.p, is_continuation, false, false);
p.p, is_continuation, false, false, &io_ex, 0);
p.v = p.p = 0;
}
@@ -588,16 +590,111 @@ protected:
const native_handle_type& native_socket, boost::system::error_code& ec);
// Start the asynchronous read or write operation.
BOOST_ASIO_DECL void start_op(base_implementation_type& impl, int op_type,
reactor_op* op, bool is_continuation, bool is_non_blocking, bool noop);
BOOST_ASIO_DECL void do_start_op(base_implementation_type& impl, int op_type,
reactor_op* op, bool is_continuation, bool is_non_blocking, bool noop,
void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg);
// Start the asynchronous operation for handlers that are specialised for
// immediate completion.
template <typename Op>
void start_op(base_implementation_type& impl, int op_type, Op* op,
bool is_continuation, bool is_non_blocking, bool noop,
const void* io_ex, ...)
{
return do_start_op(impl, op_type, op, is_continuation,
is_non_blocking, noop, &Op::do_immediate, io_ex);
}
// Start the asynchronous operation for handlers that are not specialised for
// immediate completion.
template <typename Op>
void start_op(base_implementation_type& impl, int op_type, Op* op,
bool is_continuation, bool is_non_blocking, bool noop, const void*,
typename enable_if<
is_same<
typename associated_immediate_executor<
typename Op::handler_type,
typename Op::io_executor_type
>::asio_associated_immediate_executor_is_unspecialised,
void
>::value
>::type*)
{
return do_start_op(impl, op_type, op, is_continuation, is_non_blocking,
noop, &reactor::call_post_immediate_completion, &reactor_);
}
// Start the asynchronous accept operation.
BOOST_ASIO_DECL void start_accept_op(base_implementation_type& impl,
reactor_op* op, bool is_continuation, bool peer_is_open);
BOOST_ASIO_DECL void do_start_accept_op(base_implementation_type& impl,
reactor_op* op, bool is_continuation, bool peer_is_open,
void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg);
// Start the asynchronous accept operation for handlers that are specialised
// for immediate completion.
template <typename Op>
void start_accept_op(base_implementation_type& impl, Op* op,
bool is_continuation, bool peer_is_open, const void* io_ex, ...)
{
return do_start_accept_op(impl, op, is_continuation,
peer_is_open, &Op::do_immediate, io_ex);
}
// Start the asynchronous operation for handlers that are not specialised for
// immediate completion.
template <typename Op>
void start_accept_op(base_implementation_type& impl, Op* op,
bool is_continuation, bool peer_is_open, const void*,
typename enable_if<
is_same<
typename associated_immediate_executor<
typename Op::handler_type,
typename Op::io_executor_type
>::asio_associated_immediate_executor_is_unspecialised,
void
>::value
>::type*)
{
return do_start_accept_op(impl, op, is_continuation, peer_is_open,
&reactor::call_post_immediate_completion, &reactor_);
}
// Start the asynchronous connect operation.
BOOST_ASIO_DECL void start_connect_op(base_implementation_type& impl,
reactor_op* op, bool is_continuation, const void* addr, size_t addrlen);
BOOST_ASIO_DECL void do_start_connect_op(base_implementation_type& impl,
reactor_op* op, bool is_continuation, const void* addr, size_t addrlen,
void (*on_immediate)(operation* op, bool, const void*),
const void* immediate_arg);
// Start the asynchronous operation for handlers that are specialised for
// immediate completion.
template <typename Op>
void start_connect_op(base_implementation_type& impl,
Op* op, bool is_continuation, const void* addr,
size_t addrlen, const void* io_ex, ...)
{
return do_start_connect_op(impl, op, is_continuation,
addr, addrlen, &Op::do_immediate, io_ex);
}
// Start the asynchronous operation for handlers that are not specialised for
// immediate completion.
template <typename Op>
void start_connect_op(base_implementation_type& impl, Op* op,
bool is_continuation, const void* addr, size_t addrlen, const void*,
typename enable_if<
is_same<
typename associated_immediate_executor<
typename Op::handler_type,
typename Op::io_executor_type
>::asio_associated_immediate_executor_is_unspecialised,
void
>::value
>::type*)
{
return do_start_connect_op(impl, op, is_continuation, addr,
addrlen, &reactor::call_post_immediate_completion, &reactor_);
}
// Helper class used to implement per-operation cancellation
class reactor_op_cancellation

View File

@@ -34,6 +34,9 @@ template <typename Handler, typename IoExecutor>
class reactive_wait_op : public reactor_op
{
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_wait_op);
reactive_wait_op(const boost::system::error_code& success_ec,
@@ -86,6 +89,35 @@ public:
}
}
static void do_immediate(operation* base, bool, const void* io_ex)
{
// Take ownership of the handler object.
reactive_wait_op* o(static_cast<reactive_wait_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
BOOST_ASIO_HANDLER_COMPLETION((*o));
// Take ownership of the operation's outstanding work.
immediate_handler_work<Handler, IoExecutor> w(
BOOST_ASIO_MOVE_CAST2(handler_work<Handler, IoExecutor>)(
o->work_));
// Make a copy of the handler so that the memory can be deallocated before
// the upcall is made. Even if we're not about to make an upcall, a
// sub-object of the handler may be the true owner of the memory associated
// with the handler. Consequently, a local copy of the handler is required
// to ensure that any owning sub-object remains valid until after we have
// deallocated the memory here.
detail::binder1<Handler, boost::system::error_code>
handler(o->handler_, o->ec_);
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
BOOST_ASIO_HANDLER_INVOCATION_BEGIN((handler.arg1_));
w.complete(handler, handler.handler_, io_ex);
BOOST_ASIO_HANDLER_INVOCATION_END;
}
private:
Handler handler_;
handler_work<Handler, IoExecutor> work_;

View File

@@ -95,12 +95,29 @@ public:
per_descriptor_data& descriptor_data, reactor_op* op);
// Post a reactor operation for immediate completion.
void post_immediate_completion(reactor_op* op, bool is_continuation);
void post_immediate_completion(operation* op, bool is_continuation) const;
// Post a reactor operation for immediate completion.
BOOST_ASIO_DECL static void call_post_immediate_completion(
operation* op, bool is_continuation, const void* self);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
BOOST_ASIO_DECL void start_op(int op_type, socket_type descriptor,
per_descriptor_data&, reactor_op* op, bool is_continuation, bool);
per_descriptor_data&, reactor_op* op, bool is_continuation, bool,
void (*on_immediate)(operation*, bool, const void*),
const void* immediate_arg);
// Start a new operation. The reactor operation will be performed when the
// given descriptor is flagged as ready, or an error has occurred.
void start_op(int op_type, socket_type descriptor,
per_descriptor_data& descriptor_data, reactor_op* op,
bool is_continuation, bool allow_speculative)
{
start_op(op_type, descriptor, descriptor_data,
op, is_continuation, allow_speculative,
&select_reactor::call_post_immediate_completion, this);
}
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the