From 68fe862fa9c3873a17ddc5d5cd92a3ef576b18e1 Mon Sep 17 00:00:00 2001 From: ruben Date: Mon, 3 Feb 2020 19:39:50 +0000 Subject: [PATCH] Generalized execute_query to generic_execute --- TODO.txt | 1 + .../network_algorithms/execute_generic.hpp | 35 +++ .../network_algorithms/execute_generic.ipp | 297 ++++++++++++++++++ .../impl/network_algorithms/execute_query.ipp | 273 +--------------- 4 files changed, 338 insertions(+), 268 deletions(-) create mode 100644 include/mysql/impl/network_algorithms/execute_generic.hpp create mode 100644 include/mysql/impl/network_algorithms/execute_generic.ipp diff --git a/TODO.txt b/TODO.txt index 4fcdb7be..fdb59831 100644 --- a/TODO.txt +++ b/TODO.txt @@ -34,3 +34,4 @@ Technical debt Integration test for network errors (e.g. host unreachable) Refactor file structucture for serialization and tests Deserialize common should be in namespace mysql::test, not in detail + Consider using a shared buffer in channel to avoid allocations \ No newline at end of file diff --git a/include/mysql/impl/network_algorithms/execute_generic.hpp b/include/mysql/impl/network_algorithms/execute_generic.hpp new file mode 100644 index 00000000..9c679752 --- /dev/null +++ b/include/mysql/impl/network_algorithms/execute_generic.hpp @@ -0,0 +1,35 @@ +#ifndef INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_READ_RESULTSET_HEAD_HPP_ +#define INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_READ_RESULTSET_HEAD_HPP_ + +#include "mysql/impl/network_algorithms/common.hpp" +#include "mysql/resultset.hpp" +#include + +namespace mysql +{ +namespace detail +{ + +template +void execute_generic( + channel& channel, + const Serializable& request, + resultset& output, + error_code& err, + error_info& info +); + +template +BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken, void(error_code, error_info, resultset)) +async_execute_generic( + channel& chan, + const Serializable& request, + CompletionToken&& token +); + +} +} + +#include "mysql/impl/network_algorithms/execute_generic.ipp" + +#endif /* INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_READ_RESULTSET_HEAD_HPP_ */ diff --git a/include/mysql/impl/network_algorithms/execute_generic.ipp b/include/mysql/impl/network_algorithms/execute_generic.ipp new file mode 100644 index 00000000..f8d9d2ec --- /dev/null +++ b/include/mysql/impl/network_algorithms/execute_generic.ipp @@ -0,0 +1,297 @@ +#ifndef INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_READ_RESULTSET_HEAD_IPP_ +#define INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_READ_RESULTSET_HEAD_IPP_ + +#include "mysql/impl/messages.hpp" +#include "mysql/impl/serialization.hpp" +#include +#include + +namespace mysql +{ +namespace detail +{ + +template +class execute_processor +{ + channel& channel_; + bytestring buffer_; + std::vector fields_; + std::vector field_buffers_; +public: + execute_processor(channel& chan): channel_(chan) {}; + + template + void process_request( + const Serializable& request + ) + { + // Serialize the request + capabilities caps = channel_.current_capabilities(); + serialize_message(request, caps, buffer_); + + // Prepare the channel + channel_.reset_sequence_number(); + } + + std::optional // has value if there are fields in the response + process_response( + resultset& output, + error_code& err, + error_info& info + ) + { + // Response may be: ok_packet, err_packet, local infile request (not implemented) + // If it is none of this, then the message type itself is the beginning of + // a length-encoded int containing the field count + DeserializationContext ctx (boost::asio::buffer(buffer_), channel_.current_capabilities()); + std::uint8_t msg_type; + std::tie(err, msg_type) = deserialize_message_type(ctx); + if (err) return {}; + if (msg_type == ok_packet_header) + { + ok_packet ok_packet; + err = deserialize_message(ok_packet, ctx); + if (err) return {}; + output = resultset(channel_, std::move(buffer_), ok_packet); + err.clear(); + return {}; + } + else if (msg_type == error_packet_header) + { + err = process_error_packet(ctx, info); + return {}; + } + else + { + // Resultset with metadata. First packet is an int_lenenc with + // the number of field definitions to expect. Message type is part + // of this packet, so we must rewind the context + ctx.rewind(1); + int_lenenc num_fields; + err = deserialize_message(num_fields, ctx); + if (err) return {}; + + fields_.reserve(num_fields.value); + field_buffers_.reserve(num_fields.value); + + return num_fields.value; + } + } + + error_code process_field_definition() + { + column_definition_packet field_definition; + DeserializationContext ctx (boost::asio::buffer(buffer_), channel_.current_capabilities()); + auto err = deserialize_message(field_definition, ctx); + if (err) return err; + + // Add it to our array + fields_.push_back(field_definition); + field_buffers_.push_back(std::move(buffer_)); + buffer_ = bytestring(); + + return error_code(); + } + + resultset create_resultset() && + { + return resultset( + channel_, + resultset_metadata(std::move(field_buffers_), std::move(fields_)) + ); + } + + auto& get_channel() { return channel_; } + auto& get_buffer() { return buffer_; } +}; + +} +} + +template +void mysql::detail::execute_generic( + channel& channel, + const Serializable& request, + resultset& output, + error_code& err, + error_info& info +) +{ + // Compose a com_query message, reset seq num + execute_processor processor (channel); + processor.process_request(request); + + // Send it + channel.write(boost::asio::buffer(processor.get_buffer()), err); + if (err) return; + + // Read the response + channel.read(processor.get_buffer(), err); + if (err) return; + + // Response may be: ok_packet, err_packet, local infile request (not implemented), or response with fields + auto num_fields = processor.process_response(output, err, info); + if (!num_fields) // ok or err + { + return; + } + + // We have a response with metadata, read all of the field definitions + for (std::uint64_t i = 0; i < *num_fields; ++i) + { + // Read the field definition packet + channel.read(processor.get_buffer(), err); + if (err) return; + + // Process the message + err = processor.process_field_definition(); + if (err) return; + } + + // No EOF packet is expected here, as we require deprecate EOF capabilities + output = std::move(processor).create_resultset(); + err.clear(); +} + + +template +BOOST_ASIO_INITFN_RESULT_TYPE( + CompletionToken, + void(mysql::error_code, mysql::error_info, mysql::resultset) +) +mysql::detail::async_execute_generic( + channel& chan, + const Serializable& request, + CompletionToken&& token +) +{ + using HandlerSignature = void(error_code, error_info, resultset); + using HandlerType = BOOST_ASIO_HANDLER_TYPE(CompletionToken, HandlerSignature); + using BaseType = boost::beast::async_base; + using ResultsetType = resultset; + + boost::asio::async_completion initiator(token); + + struct Op: BaseType, boost::asio::coroutine + { + std::shared_ptr> processor_; + std::uint64_t remaining_fields_ {0}; + + Op( + HandlerType&& handler, + channel& channel, + const Serializable& request + ): + BaseType(std::move(handler), channel.next_layer().get_executor()), + processor_(std::make_shared>(channel)) + { + processor_->process_request(request); + } + + // true => has fields (must continue reading) + bool process_response(bool cont) + { + ResultsetType resultset; + error_code err; + error_info info; + auto num_fields = processor_->process_response(resultset, err, info); + if (!num_fields) // ok or err + { + this->complete(cont, err, std::move(info), std::move(resultset)); + return false; + } + else + { + remaining_fields_ = *num_fields; + return true; + } + } + + void complete_with_fields(bool cont) + { + this->complete( + cont, + error_code(), + error_info(), + std::move(*processor_).create_resultset() + ); + } + + void operator()( + error_code err, + bool cont=true + ) + { + reenter(*this) + { + // The request message has already been composed in the ctor. Send it + yield processor_->get_channel().async_write( + boost::asio::buffer(processor_->get_buffer()), + std::move(*this) + ); + if (err) + { + this->complete(cont, err, error_info(), ResultsetType()); + yield break; + } + + // Read the response + yield processor_->get_channel().async_read( + processor_->get_buffer(), + std::move(*this) + ); + if (err) + { + this->complete(cont, err, error_info(), ResultsetType()); + yield break; + } + + // Response may be: ok_packet, err_packet, local infile request (not implemented), or response with fields + if (!process_response(cont)) + { + // Not a response with fields. complete() already called + yield break; + } + + // Read all of the field definitions + while (remaining_fields_ > 0) + { + // Read the field definition packet + yield processor_->get_channel().async_read( + processor_->get_buffer(), + std::move(*this) + ); + if (!err) + { + // Process the message + err = processor_->process_field_definition(); + } + + if (err) + { + this->complete(cont, err, error_info(), ResultsetType()); + yield break; + } + + remaining_fields_--; + } + + // No EOF packet is expected here, as we require deprecate EOF capabilities + complete_with_fields(cont); + yield break; + } + } + }; + + Op( + std::move(initiator.completion_handler), + chan, + request + )(error_code(), false); + return initiator.result.get(); +} + +#include + +#endif /* INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_READ_RESULTSET_HEAD_IPP_ */ diff --git a/include/mysql/impl/network_algorithms/execute_query.ipp b/include/mysql/impl/network_algorithms/execute_query.ipp index 5c68fa91..7f4e7f38 100644 --- a/include/mysql/impl/network_algorithms/execute_query.ipp +++ b/include/mysql/impl/network_algorithms/execute_query.ipp @@ -1,116 +1,7 @@ #ifndef INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_EXECUTE_QUERY_IPP_ #define INCLUDE_MYSQL_IMPL_NETWORK_ALGORITHMS_EXECUTE_QUERY_IPP_ -#include "mysql/impl/messages.hpp" -#include "mysql/impl/serialization.hpp" -#include -#include - -namespace mysql -{ -namespace detail -{ - - -template -class query_processor -{ - channel& channel_; - bytestring buffer_; - std::vector fields_; - std::vector field_buffers_; -public: - query_processor(channel& chan): channel_(chan) {}; - void process_query_request( - std::string_view query - ) - { - // Compose a com_query message - com_query_packet query_msg; - query_msg.query.value = query; - - // Serialize it - capabilities caps = channel_.current_capabilities(); - serialize_message(query_msg, caps, buffer_); - - // Prepare the channel - channel_.reset_sequence_number(); - } - - std::optional // has value if there are fields in the response - process_query_response( - resultset& output, - error_code& err, - error_info& info - ) - { - // Response may be: ok_packet, err_packet, local infile request (not implemented) - // If it is none of this, then the message type itself is the beginning of - // a length-encoded int containing the field count - DeserializationContext ctx (boost::asio::buffer(buffer_), channel_.current_capabilities()); - std::uint8_t msg_type; - std::tie(err, msg_type) = deserialize_message_type(ctx); - if (err) return {}; - if (msg_type == ok_packet_header) - { - ok_packet ok_packet; - err = deserialize_message(ok_packet, ctx); - if (err) return {}; - output = resultset(channel_, std::move(buffer_), ok_packet); - err.clear(); - return {}; - } - else if (msg_type == error_packet_header) - { - err = process_error_packet(ctx, info); - return {}; - } - else - { - // Resultset with metadata. First packet is an int_lenenc with - // the number of field definitions to expect. Message type is part - // of this packet, so we must rewind the context - ctx.rewind(1); - int_lenenc num_fields; - err = deserialize_message(num_fields, ctx); - if (err) return {}; - - fields_.reserve(num_fields.value); - field_buffers_.reserve(num_fields.value); - - return num_fields.value; - } - } - - error_code process_field_definition() - { - column_definition_packet field_definition; - DeserializationContext ctx (boost::asio::buffer(buffer_), channel_.current_capabilities()); - auto err = deserialize_message(field_definition, ctx); - if (err) return err; - - // Add it to our array - fields_.push_back(field_definition); - field_buffers_.push_back(std::move(buffer_)); - buffer_ = bytestring(); - - return error_code(); - } - - resultset create_resultset() && - { - return resultset( - channel_, - resultset_metadata(std::move(field_buffers_), std::move(fields_)) - ); - } - - auto& get_channel() { return channel_; } - auto& get_buffer() { return buffer_; } -}; - -} -} +#include "mysql/impl/network_algorithms/execute_generic.hpp" template void mysql::detail::execute_query( @@ -121,40 +12,8 @@ void mysql::detail::execute_query( error_info& info ) { - // Compose a com_query message, reset seq num - query_processor processor (channel); - processor.process_query_request(query); - - // Send it - channel.write(boost::asio::buffer(processor.get_buffer()), err); - if (err) return; - - // Read the response - channel.read(processor.get_buffer(), err); - if (err) return; - - // Response may be: ok_packet, err_packet, local infile request (not implemented), or response with fields - auto num_fields = processor.process_query_response(output, err, info); - if (!num_fields) // ok or err - { - return; - } - - // We have a response with metadata, read all of the field definitions - for (std::uint64_t i = 0; i < *num_fields; ++i) - { - // Read the field definition packet - channel.read(processor.get_buffer(), err); - if (err) return; - - // Process the message - err = processor.process_field_definition(); - if (err) return; - } - - // No EOF packet is expected here, as we require deprecate EOF capabilities - output = std::move(processor).create_resultset(); - err.clear(); + com_query_packet request { string_eof(query) }; + execute_generic(channel, request, output, err, info); } @@ -169,130 +28,8 @@ mysql::detail::async_execute_query( CompletionToken&& token ) { - using HandlerSignature = void(error_code, error_info, resultset); - using HandlerType = BOOST_ASIO_HANDLER_TYPE(CompletionToken, HandlerSignature); - using BaseType = boost::beast::async_base; - using ResultsetType = resultset; - - boost::asio::async_completion initiator(token); - - struct Op: BaseType, boost::asio::coroutine - { - std::shared_ptr> processor_; - std::uint64_t remaining_fields_ {0}; - - Op( - HandlerType&& handler, - channel& channel, - std::string_view query - ): - BaseType(std::move(handler), channel.next_layer().get_executor()), - processor_(std::make_shared>(channel)) - { - processor_->process_query_request(query); - } - - // true => has fields (must continue reading) - bool process_query_response(bool cont) - { - ResultsetType resultset; - error_code err; - error_info info; - auto num_fields = processor_->process_query_response(resultset, err, info); - if (!num_fields) // ok or err - { - this->complete(cont, err, std::move(info), std::move(resultset)); - return false; - } - else - { - remaining_fields_ = *num_fields; - return true; - } - } - - void complete_with_fields(bool cont) - { - this->complete( - cont, - error_code(), - error_info(), - std::move(*processor_).create_resultset() - ); - } - - void operator()( - error_code err, - bool cont=true - ) - { - reenter(*this) - { - // The request message has already been composed in the ctor. Send it - yield processor_->get_channel().async_write( - boost::asio::buffer(processor_->get_buffer()), - std::move(*this) - ); - if (err) - { - this->complete(cont, err, error_info(), ResultsetType()); - yield break; - } - - // Read the response - yield processor_->get_channel().async_read( - processor_->get_buffer(), - std::move(*this) - ); - if (err) - { - this->complete(cont, err, error_info(), ResultsetType()); - yield break; - } - - // Response may be: ok_packet, err_packet, local infile request (not implemented), or response with fields - if (!process_query_response(cont)) - { - // Not a response with fields. complete() already called - yield break; - } - - // Read all of the field definitions - while (remaining_fields_ > 0) - { - // Read the field definition packet - yield processor_->get_channel().async_read( - processor_->get_buffer(), - std::move(*this) - ); - if (!err) - { - // Process the message - err = processor_->process_field_definition(); - } - - if (err) - { - this->complete(cont, err, error_info(), ResultsetType()); - yield break; - } - - remaining_fields_--; - } - - // No EOF packet is expected here, as we require deprecate EOF capabilities - complete_with_fields(cont); - yield break; - } - } - }; - - Op( - std::move(initiator.completion_handler), - chan, - query - )(error_code(), false); - return initiator.result.get(); + com_query_packet request { string_eof(query) }; + return async_execute_generic(chan, request, std::forward(token)); } #include