diff --git a/Makefile b/Makefile index bd3d49aa..2dbc20c1 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -CXX = /opt/gcc-10.2.0/bin/g++-10.2.0 +#CXX = /opt/gcc-10.2.0/bin/g++-10.2.0 CPPFLAGS = CPPFLAGS += -g -O0 diff --git a/examples/example.cpp b/examples/example.cpp index 47b0e0cc..6cda3463 100644 --- a/examples/example.cpp +++ b/examples/example.cpp @@ -71,26 +71,35 @@ net::awaitable example2() } } -net::awaitable example3() +void example4() { - tcp_socket socket {co_await this_coro::executor}; + io_context ioc {1}; + auto ex = ioc.get_executor(); + tcp::resolver resv(ex); + auto const r = resv.resolve("127.0.0.1", "6379"); - sentinel_op2::config cfg - { {"127.0.0.1", "26379"} - , {"mymaster"} - , {"master"} - }; + tcp::socket socket {ex}; + net::connect(socket, r); - instance inst; - co_await async_get_instance2(socket, cfg, inst); + resp::pipeline p; + p.ping(); + + net::write(socket, buffer(p.payload)); + + resp::buffer buffer; + resp::response res; + boost::system::error_code ec; + resp::read(socket, buffer, res, ec); + resp::print(res.res); + ioc.run(); } int main() { + example4(); io_context ioc {1}; co_spawn(ioc, example1(), detached); co_spawn(ioc, example2(), detached); - co_spawn(ioc, example3(), detached); ioc.run(); } diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index 77f13b11..5952939a 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -270,7 +270,7 @@ public: return n; } - auto complete() const noexcept + auto done() const noexcept { return depth_ == 0 && bulk_ == bulk::none; } auto bulk() const noexcept @@ -317,49 +317,79 @@ public: return; } - // On a bulk read we can't read until delimiter since the payload - // may contain the delimiter itself so we have to read the whole - // chunk. However if the bulk blob is small enough it may be already - // on the buffer buf_ we read last time. If it is, there is not need - // of initiating another async op otherwise we have to read the - // missing bytes. + // On a bulk read we can't read until delimiter since the + // payload may contain the delimiter itself so we have to + // read the whole chunk. However if the bulk blob is small + // enough it may be already on the buffer buf_ we read + // last time. If it is, there is no need of initiating + // another async op otherwise we have to read the + // missing bytes. if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) { start_ = 0; - // This is not compiling. - //net::async_read( - // stream_, - // net::dynamic_buffer(*buf_), - // std::move(self)); - - auto const size = std::ssize(*buf_); - auto const read_size = parser_.bulk_length() + 2 - std::ssize(*buf_); - buf_->resize(parser_.bulk_length() + 2); + auto const s = std::ssize(*buf_); + auto const l = parser_.bulk_length(); + buf_->resize(l + 2); net::async_read( stream_, - net::buffer(buf_->data() + size, read_size), + net::buffer(buf_->data() + s, l + 2 - s), net::transfer_all(), std::move(self)); return; } default: + { + // The condition below is wrong. it must be n < 3 for case 1 + // and n < 2 for the async_read. + if (ec || n < 3) + return self.complete(ec); - if (ec || n < 3) - return self.complete(ec); + n = parser_.advance(buf_->data(), n); - n = parser_.advance(buf_->data(), n); - - //print_command_raw(*buf_, n); - buf_->erase(0, n); - if (parser_.complete()) { - //std::cout << std::endl; - return self.complete({}); - } + buf_->erase(0, n); + if (parser_.done()) + return self.complete({}); + } } } } }; +template +auto read( + AsyncReadStream& stream, + resp::buffer& buf, + resp::response& res, + boost::system::error_code& ec) +{ + parser p {&res}; + std::size_t n = 0; + goto start; + do { + if (p.bulk() == parser::bulk::none) { +start: + n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec); + if (ec || n < 3) + return n; + } else { + auto const s = std::ssize(buf); + auto const l = p.bulk_length(); + if (s < (l + 2)) { + buf.resize(l + 2); + n = net::read(stream, net::buffer(buf.data() + s, l + 2 - s), net::transfer_all()); + if (ec || n < 2) + return n; + } + } + + n = p.advance(buf.data(), n); + + buf.erase(0, n); + } while (!p.done()); + + return n; +} + template < class AsyncReadStream, class CompletionToken =