2
0
mirror of https://github.com/boostorg/mysql.git synced 2026-02-15 01:02:17 +00:00

Modified execute_query to allow async code reuse

This commit is contained in:
ruben
2019-11-17 21:01:32 +00:00
parent 418e627f5e
commit 1c12be9aca
3 changed files with 270 additions and 64 deletions

View File

@@ -66,5 +66,19 @@ mysql::resultset<Stream, Allocator> mysql::connection<Stream, Allocator>::query(
return res;
}
template <typename Stream, typename Allocator>
template <typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(mysql::error_code, mysql::resultset<Stream, Allocator>))
mysql::connection<Stream, Allocator>::async_query(
std::string_view query_string,
CompletionToken&& token
)
{
return detail::async_execute_query<channel_type, Allocator>(
query_string,
std::forward<CompletionToken>(token)
);
}
#endif /* INCLUDE_MYSQL_IMPL_CONNECTION_IMPL_HPP_ */

View File

@@ -10,6 +10,12 @@ namespace mysql
namespace detail
{
template <typename ChannelType>
using channel_stream_type = typename ChannelType::stream_type;
template <typename ChannelType, typename Allocator>
using channel_resultset_type = resultset<channel_stream_type<ChannelType>, Allocator>;
enum class fetch_result
{
error,
@@ -21,10 +27,19 @@ template <typename ChannelType, typename Allocator>
void execute_query(
ChannelType& channel,
std::string_view query,
resultset<ChannelType, Allocator>& output,
channel_resultset_type<ChannelType, Allocator>& output,
error_code& err
);
template <typename ChannelType, typename Allocator, typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(error_code, channel_resultset_type<ChannelType, Allocator>))
async_execute_query(
ChannelType& channel,
std::string_view query,
CompletionToken&& token
);
template <typename ChannelType, typename Allocator>
fetch_result fetch_text_row(
ChannelType& channel,

View File

@@ -4,17 +4,114 @@
#include "mysql/impl/messages.hpp"
#include "mysql/impl/basic_serialization.hpp"
#include "mysql/impl/deserialize_row.hpp"
#include <boost/asio/yield.hpp>
namespace mysql
{
namespace detail
{
template <typename ChannelType>
using channel_stream_type = typename ChannelType::stream_type;
}
}
template <typename ChannelType, typename Allocator>
class query_processor
{
ChannelType& channel_;
bytestring<Allocator> buffer_;
std::vector<field_metadata> fields_;
std::vector<bytestring<Allocator>> field_buffers_;
public:
query_processor(ChannelType& channel): channel_(channel) {};
void process_query_request(
std::string_view query
)
{
// Compose a com_query message
msgs::com_query query_msg;
query_msg.query.value = query;
// Serialize it
capabilities caps = channel_.current_capabilities();
serialize_message(query_msg, caps, buffer_);
// Prepare the channel
channel_.reset_sequence_number();
}
std::optional<std::uint64_t> // ok, err, or a number of fields
process_query_response(
channel_resultset_type<ChannelType, Allocator>& output,
error_code& err
)
{
// Response may be: ok_packet, err_packet, local infile request (TODO)
// If it is none of this, then the message type itself is the beginning of
// a length-encoded int containing the field count
DeserializationContext ctx (boost::asio::buffer(buffer_), channel_.current_capabilities());
std::uint8_t msg_type;
err = deserialize_message_type(msg_type, ctx);
if (err) return {};
if (msg_type == ok_packet_header)
{
msgs::ok_packet ok_packet;
err = deserialize_message(ok_packet, ctx);
if (err) return {};
output = channel_resultset_type<ChannelType, Allocator>(channel_, std::move(buffer_), ok_packet);
err.clear();
return {};
}
else if (msg_type == error_packet_header)
{
err = process_error_packet(ctx);
return {};
}
else
{
// Resultset with metadata. First packet is an int_lenenc with
// the number of field definitions to expect. Message type is part
// of this packet, so we must rewind the context
ctx.rewind(1);
int_lenenc num_fields;
err = deserialize_message(num_fields, ctx);
if (err) return {};
fields_.reserve(num_fields.value);
field_buffers_.reserve(num_fields.value);
return num_fields.value;
}
}
error_code process_field_definition()
{
msgs::column_definition field_definition;
DeserializationContext ctx (boost::asio::buffer(buffer_), channel_.current_capabilities());
auto err = deserialize_message(field_definition, ctx);
if (err) return err;
// Add it to our array
fields_.push_back(field_definition);
field_buffers_.push_back(std::move(buffer_));
buffer_ = bytestring<Allocator>();
return error_code();
}
void create_resultset(
channel_resultset_type<ChannelType, Allocator>& output
) &&
{
output = channel_resultset_type<ChannelType, Allocator>(
channel_,
resultset_metadata<Allocator>(std::move(field_buffers_), std::move(fields_))
);
}
auto& channel() { return channel_; }
auto& buffer() { return buffer_; }
};
} // detail
} // mysql
template <typename ChannelType, typename Allocator>
void mysql::detail::execute_query(
@@ -24,86 +121,166 @@ void mysql::detail::execute_query(
error_code& err
)
{
// Compose a com_query message
msgs::com_query query_msg;
query_msg.query.value = query;
// Serialize it
capabilities caps = channel.current_capabilities();
bytestring<Allocator> buffer;
serialize_message(query_msg, caps, buffer);
// Compose a com_query message, reset seq num
query_processor<ChannelType, Allocator> processor (channel);
processor.process_query_request(query);
// Send it
channel.reset_sequence_number();
channel.write(boost::asio::buffer(buffer), err);
channel.write(boost::asio::buffer(processor.buffer()), err);
if (err) return;
// Read the response
channel.read(buffer, err);
channel.read(processor.buffer(), err);
if (err) return;
// Response may be: ok_packet, err_packet, local infile request (TODO)
// If it is none of this, then the message type itself is the beginning of
// a length-encoded int containing the field count
DeserializationContext ctx (boost::asio::buffer(buffer), caps);
std::uint8_t msg_type;
err = deserialize_message_type(msg_type, ctx);
if (err) return;
if (msg_type == ok_packet_header)
// Response may be: ok_packet, err_packet, local infile request (TODO), or response with fields
auto num_fields = processor.process_query_response(output, err);
if (!num_fields) // ok or err
{
msgs::ok_packet ok_packet;
err = deserialize_message(ok_packet, ctx);
if (err) return;
output = resultset<channel_stream_type<ChannelType>, Allocator>(channel, std::move(buffer), ok_packet);
err.clear();
return;
}
else if (msg_type == error_packet_header)
{
err = process_error_packet(ctx);
return;
}
// Resultset with metadata. First packet is an int_lenenc with
// the number of field definitions to expect. Message type is part
// of this packet, so we must rewind the context
ctx.set_first(buffer.data());
int_lenenc num_fields;
err = deserialize_message(num_fields, ctx);
if (err) return;
std::vector<field_metadata> fields;
std::vector<bytestring<Allocator>> field_buffers;
fields.reserve(num_fields.value);
field_buffers.reserve(num_fields.value);
// Read all of the field definitions
for (std::uint64_t i = 0; i < num_fields.value; ++i)
// We have a response with metadata, read all of the field definitions
for (std::uint64_t i = 0; i < *num_fields; ++i)
{
// Read the field definition packet
bytestring<Allocator> field_definition_buffer;
channel.read(field_definition_buffer, err);
channel.read(processor.buffer(), err);
if (err) return;
// Deserialize the message
msgs::column_definition field_definition;
ctx = DeserializationContext(boost::asio::buffer(field_definition_buffer), caps);
err = deserialize_message(field_definition, ctx);
// Process the message
err = processor.process_field_definition();
if (err) return;
// Add it to our array
fields.push_back(field_definition);
field_buffers.push_back(std::move(field_definition_buffer));
}
// No EOF packet is expected here, as we require deprecate EOF capabilities
output = resultset<channel_stream_type<ChannelType>, Allocator>(
channel,
resultset_metadata<Allocator>(std::move(field_buffers), std::move(fields))
);
std::move(processor).create_resultset(output);
err.clear();
}
template <typename ChannelType, typename Allocator, typename CompletionToken>
BOOST_ASIO_INITFN_RESULT_TYPE(
CompletionToken,
void(mysql::error_code, mysql::detail::channel_resultset_type<ChannelType, Allocator>)
)
mysql::detail::async_execute_query(
ChannelType& channel,
std::string_view query,
CompletionToken&& token
)
{
using HandlerSignature = void(error_code, channel_resultset_type<ChannelType, Allocator>);
using HandlerType = BOOST_ASIO_HANDLER_TYPE(CompletionToken, HandlerSignature);
using StreamType = typename ChannelType::stream_type;
using BaseType = boost::beast::async_base<HandlerType, typename StreamType::executor_type>;
using ResultsetType = channel_resultset_type<ChannelType, Allocator>;
boost::asio::async_completion<CompletionToken, HandlerSignature> initiator(token);
struct Op: BaseType, boost::asio::coroutine
{
query_processor<ChannelType, Allocator> processor_;
Op(
HandlerType&& handler,
ChannelType& channel,
std::string_view query
):
BaseType(std::move(handler), channel.next_layer().get_executor()),
processor_(channel)
{
processor_.process_query_request(query);
}
std::optional<std::uint64_t> process_query_response(bool cont)
{
ResultsetType resultset;
error_code err;
auto num_fields = processor_.process_query_response(resultset, err);
if (!num_fields) // ok or err
{
complete(cont, err, std::move(resultset));
}
return num_fields;
}
void complete_with_fields(bool cont)
{
ResultsetType resultset;
std::move(processor_).create_resultset(resultset);
complete(cont, error_code(), std::move(resultset));
}
void operator()(
error_code err,
bool cont=true
)
{
std::optional<std::uint64_t> num_fields;
reenter(*this)
{
// The request message has already been composed in the ctor. Send it
yield processor_.channel().async_write(
boost::asio::buffer(processor_.buffer()),
std::move(*this)
);
if (err)
{
complete(cont, err, ResultsetType());
yield break;
}
// Read the response
yield processor_.channel().read(processor_.buffer(), std::move(*this));
if (err)
{
complete(cont, err, ResultsetType());
yield break;
}
// Response may be: ok_packet, err_packet, local infile request (TODO), or response with fields
num_fields = process_query_response(cont);
if (!num_fields)
{
// Not a response with fields. complete() already called
yield break;
}
// Read all of the field definitions
for (std::uint64_t i = 0; i < *num_fields; ++i)
{
// Read the field definition packet
yield processor_.channel().async_read(processor_.buffer(), std::move(*this));
if (!err)
{
// Process the message
err = processor_.process_field_definition();
}
if (err)
{
complete(cont, err, ResultsetType());
yield break;
}
}
// No EOF packet is expected here, as we require deprecate EOF capabilities
complete_with_fields(cont);
yield break;
}
}
};
Op(
std::move(initiator.completion_handler),
channel,
query
)(error_code(), false);
return initiator.result.get();
}
template <typename ChannelType, typename Allocator>
mysql::detail::fetch_result mysql::detail::fetch_text_row(
ChannelType& channel,
@@ -146,7 +323,7 @@ mysql::detail::fetch_result mysql::detail::fetch_text_row(
}
}
#include <boost/asio/unyield.hpp>
#endif /* INCLUDE_MYSQL_IMPL_QUERY_IMPL_HPP_ */