2
0
mirror of https://github.com/boostorg/mysql.git synced 2026-02-18 14:12:32 +00:00

Updated channel to use async_op

Moved async_op into channel definition header
This commit is contained in:
ruben
2020-04-23 14:11:21 +01:00
parent 6924d0c813
commit a5da4451fb
5 changed files with 169 additions and 187 deletions

View File

@@ -1,14 +1,12 @@
Sanitize
Test zero dates
Connection quit
Base class for async stuff: make it usable for channel
Connection connect
Close/quit to channel
Integration test for network errors (e.g. host unreachable)
Class with an error info, error code and a check method
Change read_row_result async signature to use error_info*
Attempt to remove shared_ptr's from ops
Replace yields by asio macros
Simplify channel
Better docs
Wandbox
Breaking up the tutorial in pieces

View File

@@ -1,99 +0,0 @@
//
// Copyright (c) 2019-2020 Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#ifndef INCLUDE_BOOST_MYSQL_DETAIL_NETWORK_ALGORITHMS_ASYNC_OP_HPP_
#define INCLUDE_BOOST_MYSQL_DETAIL_NETWORK_ALGORITHMS_ASYNC_OP_HPP_
#include "boost/mysql/detail/protocol/channel.hpp"
#include "boost/mysql/error.hpp"
#include <boost/asio/coroutine.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/beast/core/async_base.hpp>
namespace boost {
namespace mysql {
namespace detail {
template <
typename Stream,
typename CompletionToken,
typename HandlerSignature
>
using async_op_base = boost::beast::async_base<
BOOST_ASIO_HANDLER_TYPE(CompletionToken, HandlerSignature),
typename Stream::executor_type
>;
template <
typename Stream,
typename CompletionToken,
typename HandlerSignature,
typename Derived // CRTP
>
class async_op :
public boost::asio::coroutine,
public async_op_base<Stream, CompletionToken, HandlerSignature>
{
channel<Stream>& channel_;
error_info* output_info_;
using base_type = async_op_base<Stream, CompletionToken, HandlerSignature>;
public:
using async_completion_type = boost::asio::async_completion<CompletionToken, HandlerSignature>;
async_op(async_completion_type& initiator, channel<Stream>& chan, error_info* output_info) :
base_type(std::move(initiator.completion_handler), chan.next_layer().get_executor()),
channel_(chan),
output_info_(output_info)
{
}
channel<Stream>& get_channel() noexcept { return channel_; }
error_info* get_output_info() noexcept { return output_info_; }
template <typename... Args>
static auto initiate(
CompletionToken&& token,
channel<Stream>& chan,
error_info* output_info,
Args&&... args
)
{
async_completion_type completion (token);
Derived op (completion, chan, output_info, std::forward<Args>(args)...);
op(error_code());
return completion.result.get();
}
// Reads from channel against the channel internal buffer, using itself as
void async_read() { async_read(channel_.shared_buffer()); }
void async_read(bytestring& buff)
{
channel_.async_read(
buff,
std::move(static_cast<Derived&>(*this))
);
}
void async_write(const bytestring& buff)
{
channel_.async_write(
boost::asio::buffer(buff),
std::move(static_cast<Derived&>(*this))
);
}
void async_write() { async_write(channel_.shared_buffer()); }
};
} // detail
} // mysql
} // boost
#endif /* INCLUDE_BOOST_MYSQL_DETAIL_NETWORK_ALGORITHMS_ASYNC_OP_HPP_ */

View File

@@ -12,7 +12,6 @@
#include "boost/mysql/metadata.hpp"
#include "boost/mysql/detail/protocol/channel.hpp"
#include "boost/mysql/detail/protocol/common_messages.hpp"
#include "boost/mysql/detail/network_algorithms/async_op.hpp"
namespace boost {
namespace mysql {

View File

@@ -15,8 +15,8 @@
#include <boost/asio/async_result.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/beast/core/async_base.hpp>
#include <array>
#include <optional>
@@ -24,22 +24,22 @@ namespace boost {
namespace mysql {
namespace detail {
template <typename AsyncStream>
// Implements the message layer of the MySQL protocol
template <typename Stream>
class channel
{
// TODO: static asserts for AsyncStream concept
// TODO: actually we also require it to be SyncStream, name misleading
// TODO: static asserts for Stream concept
struct ssl_block
{
boost::asio::ssl::context ctx;
boost::asio::ssl::stream<AsyncStream&> stream;
boost::asio::ssl::stream<Stream&> stream;
ssl_block(AsyncStream& base_stream):
ssl_block(Stream& base_stream):
ctx(boost::asio::ssl::context::tls_client),
stream (base_stream, ctx) {}
};
AsyncStream& stream_;
Stream& stream_;
std::optional<ssl_block> ssl_block_;
std::uint8_t sequence_number_ {0};
std::array<std::uint8_t, 4> header_buffer_ {}; // for async ops
@@ -66,42 +66,129 @@ class channel
template <typename BufferSeq, typename CompletionToken>
auto async_write_impl(BufferSeq&& buff, CompletionToken&& token);
public:
channel(AsyncStream& stream): stream_(stream) {}
bool ssl_active() const noexcept { return ssl_block_.has_value(); }
channel(Stream& stream): stream_(stream) {}
// Reading
template <typename Allocator>
void read(basic_bytestring<Allocator>& buffer, error_code& code);
void write(boost::asio::const_buffer buffer, error_code& code);
using read_signature = void(error_code);
template <typename Allocator, typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(error_code))
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, read_signature)
async_read(basic_bytestring<Allocator>& buffer, CompletionToken&& token);
// Writing
void write(boost::asio::const_buffer buffer, error_code& code);
using write_signature = void(error_code);
template <typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(error_code))
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, write_signature)
async_write(boost::asio::const_buffer buffer, CompletionToken&& token);
// SSL
bool ssl_active() const noexcept { return ssl_block_.has_value(); }
void ssl_handshake(error_code& ec);
using ssl_handshake_signature = void(error_code);
template <typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(error_code))
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, ssl_handshake_signature)
async_ssl_handshake(CompletionToken&& token);
// Sequence numbers
void reset_sequence_number(std::uint8_t value = 0) { sequence_number_ = value; }
std::uint8_t sequence_number() const { return sequence_number_; }
using stream_type = AsyncStream;
// Getting the underlying stream
using stream_type = Stream;
stream_type& next_layer() { return stream_; }
// Capabilities
capabilities current_capabilities() const noexcept { return current_caps_; }
void set_current_capabilities(capabilities value) noexcept { current_caps_ = value; }
// Internal buffer
const bytestring& shared_buffer() const noexcept { return shared_buff_; }
bytestring& shared_buffer() noexcept { return shared_buff_; }
};
template <
typename Stream,
typename CompletionToken,
typename HandlerSignature
>
using async_op_base = boost::beast::async_base<
BOOST_ASIO_HANDLER_TYPE(CompletionToken, HandlerSignature),
typename Stream::executor_type
>;
// The base for async operations involving a channel
// Defined here so it can also be used for channel function implementation
template <
typename Stream,
typename CompletionToken,
typename HandlerSignature,
typename Derived // CRTP
>
class async_op :
public boost::asio::coroutine,
public async_op_base<Stream, CompletionToken, HandlerSignature>
{
channel<Stream>& channel_;
error_info* output_info_;
using base_type = async_op_base<Stream, CompletionToken, HandlerSignature>;
public:
using async_completion_type = boost::asio::async_completion<CompletionToken, HandlerSignature>;
async_op(async_completion_type& initiator, channel<Stream>& chan, error_info* output_info) :
base_type(std::move(initiator.completion_handler), chan.next_layer().get_executor()),
channel_(chan),
output_info_(output_info)
{
}
channel<Stream>& get_channel() noexcept { return channel_; }
error_info* get_output_info() noexcept { return output_info_; }
template <typename... Args>
static auto initiate(
CompletionToken&& token,
channel<Stream>& chan,
error_info* output_info,
Args&&... args
)
{
async_completion_type completion (token);
Derived op (completion, chan, output_info, std::forward<Args>(args)...);
op(error_code());
return completion.result.get();
}
// Reads from channel against the channel internal buffer, using itself as
void async_read() { async_read(channel_.shared_buffer()); }
void async_read(bytestring& buff)
{
channel_.async_read(
buff,
std::move(static_cast<Derived&>(*this))
);
}
void async_write(const bytestring& buff)
{
channel_.async_write(
boost::asio::buffer(buff),
std::move(static_cast<Derived&>(*this))
);
}
void async_write() { async_write(channel_.shared_buffer()); }
};
} // detail
} // mysql

View File

@@ -8,7 +8,8 @@
#ifndef BOOST_MYSQL_DETAIL_PROTOCOL_IMPL_CHANNEL_HPP
#define BOOST_MYSQL_DETAIL_PROTOCOL_IMPL_CHANNEL_HPP
#include <boost/beast/core/async_base.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <cassert>
#include "boost/mysql/detail/protocol/common_messages.hpp"
#include "boost/mysql/detail/protocol/constants.hpp"
@@ -34,8 +35,8 @@ inline std::uint32_t compute_size_to_write(
} // mysql
} // boost
template <typename AsyncStream>
bool boost::mysql::detail::channel<AsyncStream>::process_sequence_number(
template <typename Stream>
bool boost::mysql::detail::channel<Stream>::process_sequence_number(
std::uint8_t got
)
{
@@ -50,8 +51,8 @@ bool boost::mysql::detail::channel<AsyncStream>::process_sequence_number(
}
}
template <typename AsyncStream>
boost::mysql::error_code boost::mysql::detail::channel<AsyncStream>::process_header_read(
template <typename Stream>
boost::mysql::error_code boost::mysql::detail::channel<Stream>::process_header_read(
std::uint32_t& size_to_read
)
{
@@ -67,8 +68,8 @@ boost::mysql::error_code boost::mysql::detail::channel<AsyncStream>::process_hea
return error_code();
}
template <typename AsyncStream>
void boost::mysql::detail::channel<AsyncStream>::process_header_write(
template <typename Stream>
void boost::mysql::detail::channel<Stream>::process_header_write(
std::uint32_t size_to_write
)
{
@@ -79,9 +80,9 @@ void boost::mysql::detail::channel<AsyncStream>::process_header_write(
serialize(header, ctx);
}
template <typename AsyncStream>
template <typename Stream>
template <typename BufferSeq>
std::size_t boost::mysql::detail::channel<AsyncStream>::read_impl(
std::size_t boost::mysql::detail::channel<Stream>::read_impl(
BufferSeq&& buff,
error_code& ec
)
@@ -96,9 +97,9 @@ std::size_t boost::mysql::detail::channel<AsyncStream>::read_impl(
}
}
template <typename AsyncStream>
template <typename Stream>
template <typename BufferSeq>
std::size_t boost::mysql::detail::channel<AsyncStream>::write_impl(
std::size_t boost::mysql::detail::channel<Stream>::write_impl(
BufferSeq&& buff,
error_code& ec
)
@@ -113,9 +114,9 @@ std::size_t boost::mysql::detail::channel<AsyncStream>::write_impl(
}
}
template <typename AsyncStream>
template <typename Stream>
template <typename BufferSeq, typename CompletionToken>
auto boost::mysql::detail::channel<AsyncStream>::async_read_impl(
auto boost::mysql::detail::channel<Stream>::async_read_impl(
BufferSeq&& buff,
CompletionToken&& token
)
@@ -138,9 +139,9 @@ auto boost::mysql::detail::channel<AsyncStream>::async_read_impl(
}
}
template <typename AsyncStream>
template <typename Stream>
template <typename BufferSeq, typename CompletionToken>
auto boost::mysql::detail::channel<AsyncStream>::async_write_impl(
auto boost::mysql::detail::channel<Stream>::async_write_impl(
BufferSeq&& buff,
CompletionToken&& token
)
@@ -163,9 +164,9 @@ auto boost::mysql::detail::channel<AsyncStream>::async_write_impl(
}
}
template <typename AsyncStream>
template <typename Stream>
template <typename Allocator>
void boost::mysql::detail::channel<AsyncStream>::read(
void boost::mysql::detail::channel<Stream>::read(
basic_bytestring<Allocator>& buffer,
error_code& code
)
@@ -198,8 +199,8 @@ void boost::mysql::detail::channel<AsyncStream>::read(
} while (size_to_read == MAX_PACKET_SIZE);
}
template <typename AsyncStream>
void boost::mysql::detail::channel<AsyncStream>::write(
template <typename Stream>
void boost::mysql::detail::channel<Stream>::write(
boost::asio::const_buffer buffer,
error_code& code
)
@@ -227,40 +228,36 @@ void boost::mysql::detail::channel<AsyncStream>::write(
}
template <typename AsyncStream>
template <typename Stream>
template <typename Allocator, typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(boost::mysql::error_code))
boost::mysql::detail::channel<AsyncStream>::async_read(
BOOST_ASIO_INITFN_RESULT_TYPE(
CompletionToken,
typename boost::mysql::detail::channel<Stream>::read_signature
)
boost::mysql::detail::channel<Stream>::async_read(
basic_bytestring<Allocator>& buffer,
CompletionToken&& token
)
{
using HandlerSignature = void(mysql::error_code);
using HandlerType = BOOST_ASIO_HANDLER_TYPE(CompletionToken, HandlerSignature);
using BaseType = boost::beast::async_base<HandlerType, typename AsyncStream::executor_type>;
boost::asio::async_completion<CompletionToken, HandlerSignature> initiator(token);
struct Op: BaseType, boost::asio::coroutine
struct op : async_op<Stream, CompletionToken, read_signature, op>
{
channel<AsyncStream>& stream_;
basic_bytestring<Allocator>& buffer_;
std::size_t total_transferred_size_ = 0;
Op(
HandlerType&& handler,
channel<AsyncStream>& stream,
op(
boost::asio::async_completion<CompletionToken, read_signature>& completion,
channel<Stream>& chan,
error_info* output_info,
basic_bytestring<Allocator>& buffer
):
BaseType(std::move(handler), stream.next_layer().get_executor()),
stream_(stream),
) :
async_op<Stream, CompletionToken, read_signature, op>(completion, chan, output_info),
buffer_(buffer)
{
}
void operator()(
error_code code,
std::size_t bytes_transferred,
std::size_t bytes_transferred=0,
bool cont=true
)
{
@@ -273,17 +270,18 @@ boost::mysql::detail::channel<AsyncStream>::async_read(
// Non-error path
std::uint32_t size_to_read = 0;
channel<Stream>& chan = this->get_channel();
reenter(*this)
{
do
{
yield stream_.async_read_impl(
boost::asio::buffer(stream_.header_buffer_),
yield chan.async_read_impl(
boost::asio::buffer(chan.header_buffer_),
std::move(*this)
);
valgrind_make_mem_defined(boost::asio::buffer(stream_.header_buffer_));
valgrind_make_mem_defined(boost::asio::buffer(chan.header_buffer_));
code = stream_.process_header_read(size_to_read);
code = chan.process_header_read(size_to_read);
if (code)
{
this->complete(cont, code);
@@ -292,7 +290,7 @@ boost::mysql::detail::channel<AsyncStream>::async_read(
buffer_.resize(buffer_.size() + size_to_read);
yield stream_.async_read_impl(
yield chan.async_read_impl(
boost::asio::buffer(buffer_.data() + total_transferred_size_, size_to_read),
std::move(*this)
);
@@ -309,44 +307,39 @@ boost::mysql::detail::channel<AsyncStream>::async_read(
};
buffer.clear();
Op(std::move(initiator.completion_handler), *this, buffer)(error_code(), 0, false);
return initiator.result.get();
return op::initiate(std::forward<CompletionToken>(token), *this, nullptr, buffer);
}
template <typename AsyncStream>
template <typename Stream>
template <typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(boost::mysql::error_code))
boost::mysql::detail::channel<AsyncStream>::async_write(
BOOST_ASIO_INITFN_RESULT_TYPE(
CompletionToken,
typename boost::mysql::detail::channel<Stream>::write_signature
)
boost::mysql::detail::channel<Stream>::async_write(
boost::asio::const_buffer buffer,
CompletionToken&& token
)
{
using HandlerSignature = void(mysql::error_code);
using HandlerType = BOOST_ASIO_HANDLER_TYPE(CompletionToken, HandlerSignature);
using BaseType = boost::beast::async_base<HandlerType, typename AsyncStream::executor_type>;
boost::asio::async_completion<CompletionToken, HandlerSignature> initiator(token);
struct Op : BaseType, boost::asio::coroutine
struct op : async_op<Stream, CompletionToken, write_signature, op>
{
channel<AsyncStream>& stream_;
boost::asio::const_buffer buffer_;
std::size_t total_transferred_size_ = 0;
Op(
HandlerType&& handler,
channel<AsyncStream>& stream,
op(
boost::asio::async_completion<CompletionToken, write_signature>& completion,
channel<Stream>& chan,
error_info* output_info,
boost::asio::const_buffer buffer
):
BaseType(std::move(handler), stream.next_layer().get_executor()),
stream_(stream),
) :
async_op<Stream, CompletionToken, write_signature, op>(completion, chan, output_info),
buffer_(buffer)
{
}
void operator()(
error_code code,
std::size_t bytes_transferred,
std::size_t bytes_transferred=0,
bool cont=true
)
{
@@ -359,17 +352,18 @@ boost::mysql::detail::channel<AsyncStream>::async_write(
// Non-error path
std::uint32_t size_to_write;
channel<Stream>& chan = this->get_channel();
reenter(*this)
{
// Force write the packet header on an empty packet, at least.
do
{
size_to_write = compute_size_to_write(buffer_.size(), total_transferred_size_);
stream_.process_header_write(size_to_write);
chan.process_header_write(size_to_write);
yield stream_.async_write_impl(
yield chan.async_write_impl(
std::array<boost::asio::const_buffer, 2> {
boost::asio::buffer(stream_.header_buffer_),
boost::asio::buffer(chan.header_buffer_),
boost::asio::buffer(buffer_ + total_transferred_size_, size_to_write)
},
std::move(*this)
@@ -385,8 +379,7 @@ boost::mysql::detail::channel<AsyncStream>::async_write(
};
Op(std::move(initiator.completion_handler), *this, buffer)(error_code(), 0, false);
return initiator.result.get();
return op::initiate(std::forward<CompletionToken>(token), *this, nullptr, buffer);
}
template <typename Stream>
@@ -400,7 +393,10 @@ void boost::mysql::detail::channel<Stream>::ssl_handshake(
template <typename Stream>
template <typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(boost::mysql::error_code))
BOOST_ASIO_INITFN_RESULT_TYPE(
CompletionToken,
typename boost::mysql::detail::channel<Stream>::ssl_handshake_signature
)
boost::mysql::detail::channel<Stream>::async_ssl_handshake(
CompletionToken&& token
)
@@ -412,6 +408,7 @@ boost::mysql::detail::channel<Stream>::async_ssl_handshake(
);
}
#include <boost/asio/unyield.hpp>