diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 92a8b44c..cca9551a 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -6,50 +6,75 @@ */ #include -#include using namespace aedis; -void print_helper(command cmd, resp3::type type, buffers& buf) +void print_helper(command cmd, resp3::type type, buffers& bufs) { + std::cout << cmd << " (" << type << "): "; + switch (type) { - case resp3::type::simple_string: - { - std::cout << cmd << " " << buf.simple_string << " (" << type << ")" << std::endl; - } break; - case resp3::type::push: - case resp3::type::map: - { - std::cout << cmd << " (" << type << ")" << std::endl; - } break; - default:{} + case resp3::type::simple_string: std::cout << bufs.simple_string; break; + case resp3::type::blob_string: std::cout << bufs.blob_string; break; + case resp3::type::number: std::cout << bufs.number; break; + default: + std::cout << "Unexpected." << "\n"; } + + std::cout << "\n"; } -struct myreceiver { - std::shared_ptr conn; - buffers& buf; +net::awaitable +reader(net::ip::tcp::socket& socket, std::queue& reqs) +{ + buffers bufs; + std::string buffer; - void operator()(command cmd, resp3::type type) const - { - if (cmd == command::hello) { - assert(type == resp3::type::map); - conn->ping(); - conn->psubscribe({"aaa*"}); - conn->quit(); + detail::prepare_queue(reqs); + reqs.back().hello("3"); + + co_await async_write(socket, net::buffer(reqs.back().payload), net::use_awaitable); + + detail::response_adapters adapters{bufs}; + for (;;) { + auto const event = co_await detail::async_consume(socket, buffer, adapters, reqs); + + switch (event.first) { + case command::hello: + { + auto const empty = detail::prepare_queue(reqs); + reqs.back().ping(); + reqs.back().incr("a"); + reqs.back().set("b", {"Some string"}); + reqs.back().get("b"); + reqs.back().quit(); + if (empty) + co_await detail::async_write_all(socket, reqs); + } break; + case command::get: + case command::incr: + case command::quit: + case command::set: + case command::ping: + print_helper(event.first, event.second, bufs); + break; + default: { + std::cout << "PUSH notification (" << event.second << ")" << std::endl; + } } - - print_helper(cmd, type, buf); } -}; +} int main() { - net::io_context ioc {1}; - auto conn = std::make_shared(ioc.get_executor()); + net::io_context ioc; - buffers bufs; - myreceiver recv{conn, bufs}; - conn->run(std::ref(recv), bufs); + net::ip::tcp::socket socket{ioc}; + net::ip::tcp::resolver resolver{ioc}; + auto const res = resolver.resolve("127.0.0.1", "6379"); + net::connect(socket, res); + + std::queue reqs; + co_spawn(ioc, reader(socket, reqs), net::detached); ioc.run(); } diff --git a/include/aedis/aedis.hpp b/include/aedis/aedis.hpp index 907cf9e7..131b7ec7 100644 --- a/include/aedis/aedis.hpp +++ b/include/aedis/aedis.hpp @@ -11,3 +11,4 @@ #include #include #include +#include diff --git a/include/aedis/connection.hpp b/include/aedis/connection.hpp index 5b6ab7ac..2ade9681 100644 --- a/include/aedis/connection.hpp +++ b/include/aedis/connection.hpp @@ -49,7 +49,6 @@ private: std::string buffer_; std::queue reqs_; config conf_; - boost::system::error_code ec_; template net::awaitable worker_coro(Receiver receiver, buffers& bufs) @@ -117,7 +116,7 @@ public: if (empty) { co_spawn( socket_.get_executor(), - detail::async_write_all(socket_, reqs_, ec_), + detail::async_write_all(socket_, reqs_), net::detached); } diff --git a/include/aedis/detail/write.hpp b/include/aedis/detail/write.hpp index 5dcaa4ed..fcebc7ed 100644 --- a/include/aedis/detail/write.hpp +++ b/include/aedis/detail/write.hpp @@ -51,28 +51,33 @@ template net::awaitable async_write_all( AsyncReadWriteStream& socket, - std::queue& reqs, - boost::system::error_code& ec) + std::queue& reqs) { // Commands like unsubscribe have a push response so we do not // have to wait for a response before sending a new pipeline. while (!std::empty(reqs)) { auto buffer = net::buffer(reqs.front().payload); - co_await async_write( - socket, - buffer, - net::redirect_error(net::use_awaitable, ec)); - - if (ec) { - co_return; - } - + co_await async_write(socket, buffer, net::use_awaitable); if (!std::empty(reqs.front().cmds)) break; - reqs.pop(); } } +inline +bool prepare_queue(std::queue& reqs, int max_cmds = 5000) +{ + auto const empty = std::empty(reqs); + if (empty || std::size(reqs) == 1) { + reqs.push({}); + return empty; + } + + if (std::ssize(reqs.back()) > max_cmds) + reqs.push({}); + + return false; +} + } // detail } // aedis diff --git a/include/aedis/detail/utils.hpp b/include/aedis/utils.hpp similarity index 93% rename from include/aedis/detail/utils.hpp rename to include/aedis/utils.hpp index a67fa17f..27495059 100644 --- a/include/aedis/detail/utils.hpp +++ b/include/aedis/utils.hpp @@ -9,6 +9,10 @@ #include +#include "type.hpp" +#include "command.hpp" +#include "buffers.hpp" + namespace aedis { template