From 20f00e4c0e8fa4da63aef3b3582cc5b031501160 Mon Sep 17 00:00:00 2001 From: ruben Date: Wed, 28 Aug 2019 08:02:12 +0100 Subject: [PATCH] Added cursor/fetch support to prepared statements --- include/message_serialization.hpp | 1 + include/messages.hpp | 12 ++++++++++++ include/prepared_statement.hpp | 28 +++++++++++++++++++++------- main.cpp | 8 ++++---- src/message_serialization.cpp | 7 +++++++ src/prepared_statement.cpp | 31 +++++++++++++++++++++++++------ 6 files changed, 70 insertions(+), 17 deletions(-) diff --git a/include/message_serialization.hpp b/include/message_serialization.hpp index 2b0ab6dc..8d8227bc 100644 --- a/include/message_serialization.hpp +++ b/include/message_serialization.hpp @@ -29,6 +29,7 @@ ReadIterator deserialize(ReadIterator from, ReadIterator last, StmtPrepareRespon void serialize(DynamicBuffer& buffer, const StmtExecute& value); ReadIterator deserialize(ReadIterator from, ReadIterator last, StmtExecuteResponseHeader& output); std::pair compute_field_type(const BinaryValue&); +void serialize(DynamicBuffer& buffer, const StmtFetch& value); // Text serialization std::ostream& operator<<(std::ostream& os, const Handshake& value); diff --git a/include/messages.hpp b/include/messages.hpp index 46c82324..cd123d32 100644 --- a/include/messages.hpp +++ b/include/messages.hpp @@ -195,6 +195,11 @@ struct ColumnDefinition }; // Prepared statements +constexpr int1 CURSOR_TYPE_NO_CURSOR = 0; +constexpr int1 CURSOR_TYPE_READ_ONLY = 1; +constexpr int1 CURSOR_TYPE_FOR_UPDATE = 2; +constexpr int1 CURSOR_TYPE_SCROLLABLE = 4; + struct StmtPrepare { string_eof statement; @@ -241,6 +246,13 @@ struct StmtExecuteResponseHeader int1 num_fields; }; +struct StmtFetch +{ + // int1 message_type: COM_STMT_FETCH + int4 statement_id; + int4 rows_to_fetch; +}; + } diff --git a/include/prepared_statement.hpp b/include/prepared_statement.hpp index 15d824bc..202a3ec3 100644 --- a/include/prepared_statement.hpp +++ b/include/prepared_statement.hpp @@ -5,6 +5,7 @@ #include "mysql_stream.hpp" #include #include +#include namespace mysql { @@ -21,6 +22,8 @@ class BinaryResultset enum class State { initial, data_available, exhausted }; MysqlStream* stream_; + int4 statement_id_; + int4 fetch_count_; std::vector fields_; std::vector current_packet_; std::vector current_values_; @@ -29,9 +32,11 @@ class BinaryResultset void read_metadata(); void process_ok(); + void send_fetch(); + bool cursor_exists() const { return ok_packet_.status_flags & SERVER_STATUS_CURSOR_EXISTS; } public: - BinaryResultset(MysqlStream& stream): - stream_ {&stream}, ok_packet_ {}, + BinaryResultset(MysqlStream& stream, int4 stmt_id, int4 fetch_count): + stream_ {&stream}, statement_id_ {stmt_id}, fetch_count_ {fetch_count}, ok_packet_ {}, state_ {State::initial} { read_metadata(); }; BinaryResultset(const BinaryResultset&) = delete; BinaryResultset(BinaryResultset&&) = default; @@ -51,8 +56,10 @@ class PreparedStatement std::vector params_; std::vector columns_; - BinaryResultset do_execute(const StmtExecute& message); + BinaryResultset do_execute(const StmtExecute& message, int4 fetch_count); public: + static constexpr int4 MAX_FETCH_COUNT = std::numeric_limits::max(); + PreparedStatement(MysqlStream& stream, int4 statement_id, std::vector&& params, std::vector&& columns); PreparedStatement(const PreparedStatement&) = delete; @@ -67,7 +74,10 @@ public: const std::vector& columns() const { return columns_; } template - BinaryResultset execute(Params&&... params); + BinaryResultset execute(Params&&... params) { return execute_with_cursor(MAX_FETCH_COUNT, std::forward(params)...); } + + template + BinaryResultset execute_with_cursor(int4 fetch_count, Params&&... params); // close(Connection) // Destructor should try to auto-close @@ -114,15 +124,19 @@ void fill_execute_msg(StmtExecute& output, std::size_t num_params, Args&&... arg template -mysql::BinaryResultset mysql::PreparedStatement::execute(Params&&... actual_params) +mysql::BinaryResultset mysql::PreparedStatement::execute_with_cursor( + int4 fetch_count, + Params&&... actual_params +) { + int1 flags = fetch_count == MAX_FETCH_COUNT ? CURSOR_TYPE_NO_CURSOR : CURSOR_TYPE_READ_ONLY; StmtExecute message { statement_id_, - 0 // Cursor type: no cursor. TODO: allow execution with different cursor types + flags }; detail::fill_execute_msg(message, params_.size(), std::forward(actual_params)...); - return do_execute(message); + return do_execute(message, fetch_count); } diff --git a/main.cpp b/main.cpp index 1f617f43..15830641 100644 --- a/main.cpp +++ b/main.cpp @@ -72,12 +72,12 @@ int main() // Prepare a statement auto stmt = mysql::PreparedStatement::prepare( stream, "SELECT * from users WHERE age < ? and first_name <> ?"); - auto res = stmt.execute(40, string_lenenc{"hola"}); + auto res = stmt.execute_with_cursor(2, 200, string_lenenc{"hola"}); print(res); - auto make_older = mysql::PreparedStatement::prepare(stream, "UPDATE users SET age = age + 1"); + /*auto make_older = mysql::PreparedStatement::prepare(stream, "UPDATE users SET age = age + 1"); res = make_older.execute(); print(res); - res = stmt.execute(40, string_lenenc{"hola"}); + res = stmt.execute_with_cursor(2, 40, string_lenenc{"hola"}); cout << "\n\n"; - print(res); + print(res);*/ } diff --git a/src/message_serialization.cpp b/src/message_serialization.cpp index 94249109..e19e203c 100644 --- a/src/message_serialization.cpp +++ b/src/message_serialization.cpp @@ -210,6 +210,13 @@ std::pair mysql::compute_field_type(const BinaryValue& v }, v); } +void mysql::serialize(DynamicBuffer& buffer, const StmtFetch& value) +{ + serialize(buffer, Command::COM_STMT_FETCH); + serialize(buffer, value.statement_id); + serialize(buffer, value.rows_to_fetch); +} + // Text serialization std::ostream& mysql::operator<<(std::ostream& os, const Handshake& value) { diff --git a/src/prepared_statement.cpp b/src/prepared_statement.cpp index 6c9424f3..b7557990 100644 --- a/src/prepared_statement.cpp +++ b/src/prepared_statement.cpp @@ -173,11 +173,25 @@ void mysql::BinaryResultset::process_ok() deserialize(current_packet_.data() + 1, current_packet_.data() + current_packet_.size(), ok_packet_); - if (ok_packet_.status_flags & SERVER_STATUS_CURSOR_EXISTS) + if (cursor_exists() && + !(ok_packet_.status_flags & SERVER_STATUS_LAST_ROW_SENT)) { - // TODO: handle cursor semantics + send_fetch(); + retrieve_next(); } - state_ = State::exhausted; + else + { + state_ = State::exhausted; + } +} + +void mysql::BinaryResultset::send_fetch() +{ + mysql::StmtFetch msg { statement_id_, fetch_count_ }; + DynamicBuffer buffer; + serialize(buffer, msg); + stream_->reset_sequence_number(); + stream_->write(buffer.get()); } bool mysql::BinaryResultset::retrieve_next() @@ -201,7 +215,9 @@ bool mysql::BinaryResultset::retrieve_next() const mysql::OkPacket& mysql::BinaryResultset::ok_packet() const { - assert(state_ == State::exhausted); + // TODO: fetch semantics are not aligned with this assertion + assert(state_ == State::exhausted || + (state_ == State::data_available && cursor_exists())); return ok_packet_; } @@ -211,7 +227,10 @@ const std::vector& mysql::BinaryResultset::values() const return current_values_; } -mysql::BinaryResultset mysql::PreparedStatement::do_execute(const StmtExecute& message) +mysql::BinaryResultset mysql::PreparedStatement::do_execute( + const StmtExecute& message, + int4 fetch_count +) { std::vector read_buffer; @@ -221,7 +240,7 @@ mysql::BinaryResultset mysql::PreparedStatement::do_execute(const StmtExecute& m stream_->reset_sequence_number(); stream_->write(write_buffer.get()); - return mysql::BinaryResultset {*stream_}; + return mysql::BinaryResultset {*stream_, statement_id_, fetch_count}; }