From a8299169c1eacfdb5bb54b54990affa4f7d34665 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Thu, 31 Dec 2020 22:39:44 +0100 Subject: [PATCH] Adds write functions. --- README.md | 28 +++++++------- examples/async_all_hashes.cpp | 24 ++++++------ examples/async_basic.cpp | 2 +- examples/async_events.cpp | 18 ++++----- examples/async_pubsub.cpp | 28 +++++++------- examples/async_reconnect.cpp | 6 +-- examples/sync_basic.cpp | 2 +- examples/sync_events.cpp | 20 +++++----- examples/sync_responses.cpp | 12 +++--- include/aedis/request.hpp | 1 + include/aedis/write.hpp | 73 +++++++++++++++++++++++++++++++++++ 11 files changed, 142 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index 427e8161..77cabcd0 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ int main() tcp::resolver resv(ioc); tcp::socket socket {ioc}; net::connect(socket, resv.resolve("127.0.0.1", "6379")); - net::write(socket, net::buffer(req.payload)); + resp::write(socket, req); std::string buffer; for (;;) { @@ -45,13 +45,11 @@ int main() { resp::response_flat_map res; resp::read(socket, buffer, res); - print(res.result); } break; case resp::command::get: { resp::response_blob_string res; resp::read(socket, buffer, res); - std::cout << "get: " << res.result << std::endl; } break; default: { @@ -127,22 +125,22 @@ and pass it as argument to the request as follows net::awaitable example() { try { - resp::request p; - p.rpush("list", {1, 2, 3}); - p.lrange("list", 0, -1, myevents::interesting1); - p.sadd("set", std::set{3, 4, 5}); - p.smembers("set", myevents::interesting2); - p.quit(); + resp::request req; + req.rpush("list", {1, 2, 3}); + req.lrange("list", 0, -1, myevents::interesting1); + req.sadd("set", std::set{3, 4, 5}); + req.smembers("set", myevents::interesting2); + req.quit(); auto ex = co_await this_coro::executor; tcp::resolver resv(ex); tcp_socket socket {ex}; co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379")); - co_await net::async_write(socket, net::buffer(p.payload)); + co_await resp::async_write(socket, req); std::string buffer; for (;;) { - switch (p.events.front().second) { + switch (req.events.front().second) { case myevents::interesting1: { resp::response_list res; @@ -161,7 +159,7 @@ net::awaitable example() co_await resp::async_read(socket, buffer, res); } } - p.events.pop(); + req.events.pop(); } } catch (std::exception const& e) { std::cerr << e.what() << std::endl; @@ -187,14 +185,14 @@ net::awaitable example1() auto ex = co_await this_coro::executor; for (;;) { try { - resp::request p; - p.quit(); + resp::request req; + req.quit(); tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, net::buffer(p.payload)); + co_await async_write(socket, req); std::string buffer; for (;;) { diff --git a/examples/async_all_hashes.cpp b/examples/async_all_hashes.cpp index 89f83a4f..c1f08056 100644 --- a/examples/async_all_hashes.cpp +++ b/examples/async_all_hashes.cpp @@ -48,20 +48,20 @@ auto make_hset_arg(foo const& p) net::awaitable create_hashes() { std::vector posts(20000); - resp::request p; - p.flushall(); + resp::request req; + req.flushall(); for (auto i = 0; i < std::ssize(posts); ++i) { std::string const name = "posts:" + std::to_string(i); - p.hset(name, make_hset_arg(posts[i])); + req.hset(name, make_hset_arg(posts[i])); } - p.quit(); + req.quit(); auto ex = co_await this_coro::executor; tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, buffer(p.payload)); + co_await async_write(socket, req); std::string buffer; resp::response_ignore res; @@ -71,15 +71,15 @@ net::awaitable create_hashes() net::awaitable read_hashes_coro() { - resp::request p; - p.keys("posts:*"); + resp::request req; + req.keys("posts:*"); auto ex = co_await this_coro::executor; tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, net::buffer(p.payload)); + co_await async_write(socket, req); std::string buffer; @@ -107,14 +107,14 @@ net::awaitable read_hashes_coro() void read_hashes(net::io_context& ioc) { - resp::request p; - p.keys("posts:*"); + resp::request req; + req.keys("posts:*"); tcp::resolver resv(ioc); auto const r = resv.resolve("127.0.0.1", "6379"); tcp::socket socket {ioc}; net::connect(socket, r); - net::write(socket, net::buffer(p.payload)); + resp::write(socket, req); std::string buffer; @@ -127,7 +127,7 @@ void read_hashes(net::io_context& ioc) pv.hvals(o); pv.quit(); - net::write(socket, net::buffer(pv.payload)); + write(socket, pv); for (auto const& key : keys.result) { resp::response_array value; diff --git a/examples/async_basic.cpp b/examples/async_basic.cpp index 9b823a80..22473014 100644 --- a/examples/async_basic.cpp +++ b/examples/async_basic.cpp @@ -27,7 +27,7 @@ net::awaitable example1() auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, net::buffer(req.payload)); + co_await async_write(socket, req); std::string buffer; for (;;) { diff --git a/examples/async_events.cpp b/examples/async_events.cpp index cb9659fd..7cf4c168 100644 --- a/examples/async_events.cpp +++ b/examples/async_events.cpp @@ -22,22 +22,22 @@ enum class myevents net::awaitable example() { try { - resp::request p; - p.rpush("list", {1, 2, 3}); - p.lrange("list", 0, -1, myevents::interesting1); - p.sadd("set", std::set{3, 4, 5}); - p.smembers("set", myevents::interesting2); - p.quit(); + resp::request req; + req.rpush("list", {1, 2, 3}); + req.lrange("list", 0, -1, myevents::interesting1); + req.sadd("set", std::set{3, 4, 5}); + req.smembers("set", myevents::interesting2); + req.quit(); auto ex = co_await this_coro::executor; tcp::resolver resv(ex); tcp_socket socket {ex}; co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379")); - co_await net::async_write(socket, net::buffer(p.payload)); + co_await resp::async_write(socket, req); std::string buffer; for (;;) { - switch (p.events.front().second) { + switch (req.events.front().second) { case myevents::interesting1: { resp::response_list res; @@ -56,7 +56,7 @@ net::awaitable example() co_await resp::async_read(socket, buffer, res); } } - p.events.pop(); + req.events.pop(); } } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp index 1f1db91a..ebcea046 100644 --- a/examples/async_pubsub.cpp +++ b/examples/async_pubsub.cpp @@ -9,15 +9,13 @@ #include namespace net = aedis::net; +namespace this_coro = net::this_coro; + +using namespace aedis; using tcp = net::ip::tcp; using tcp_socket = net::use_awaitable_t<>::as_default_on_t; using stimer = net::use_awaitable_t<>::as_default_on_t; -namespace this_coro = net::this_coro; - -using namespace net; -using namespace aedis; - net::awaitable publisher() { auto ex = co_await this_coro::executor; @@ -29,10 +27,10 @@ net::awaitable publisher() std::string buffer; for (;;) { - resp::request p; - p.hello(); - p.publish("channel", "12345"); - co_await async_write(socket, net::buffer(p.payload)); + resp::request req; + req.hello(); + req.publish("channel", "12345"); + co_await async_write(socket, req); resp::response_ignore res; co_await resp::async_read(socket, buffer, res); co_await resp::async_read(socket, buffer, res); @@ -49,14 +47,14 @@ net::awaitable subscriber() { auto ex = co_await this_coro::executor; try { - resp::request p; - p.subscribe("channel"); + resp::request req; + req.subscribe("channel"); tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, buffer(p.payload)); + co_await async_write(socket, req); std::string buffer; @@ -77,9 +75,9 @@ net::awaitable subscriber() int main() { - io_context ioc {1}; - co_spawn(ioc, publisher(), detached); - co_spawn(ioc, subscriber(), detached); + net::io_context ioc {1}; + co_spawn(ioc, publisher(), net::detached); + co_spawn(ioc, subscriber(), net::detached); ioc.run(); } diff --git a/examples/async_reconnect.cpp b/examples/async_reconnect.cpp index 613ea8c2..b65074c0 100644 --- a/examples/async_reconnect.cpp +++ b/examples/async_reconnect.cpp @@ -20,14 +20,14 @@ net::awaitable example1() auto ex = co_await this_coro::executor; for (;;) { try { - resp::request p; - p.quit(); + resp::request req; + req.quit(); tcp::resolver resv(ex); auto const r = resv.resolve("127.0.0.1", "6379"); tcp_socket socket {ex}; co_await async_connect(socket, r); - co_await async_write(socket, net::buffer(p.payload)); + co_await async_write(socket, req); std::string buffer; for (;;) { diff --git a/examples/sync_basic.cpp b/examples/sync_basic.cpp index de0e9927..e95fa925 100644 --- a/examples/sync_basic.cpp +++ b/examples/sync_basic.cpp @@ -23,7 +23,7 @@ int main() tcp::resolver resv(ioc); tcp::socket socket {ioc}; net::connect(socket, resv.resolve("127.0.0.1", "6379")); - net::write(socket, net::buffer(req.payload)); + resp::write(socket, req); std::string buffer; for (;;) { diff --git a/examples/sync_events.cpp b/examples/sync_events.cpp index d32ad5d0..2d2bd478 100644 --- a/examples/sync_events.cpp +++ b/examples/sync_events.cpp @@ -21,23 +21,23 @@ enum class myevents int main() { try { - resp::request p; - p.hello(); - p.rpush("list", {1, 2, 3}); - p.lrange("list", 0, -1, myevents::interesting1); - p.sadd("set", std::set{3, 4, 5}); - p.smembers("set", myevents::interesting2); - p.quit(); + resp::request req; + req.hello(); + req.rpush("list", {1, 2, 3}); + req.lrange("list", 0, -1, myevents::interesting1); + req.sadd("set", std::set{3, 4, 5}); + req.smembers("set", myevents::interesting2); + req.quit(); io_context ioc {1}; tcp::resolver resv(ioc); tcp::socket socket {ioc}; net::connect(socket, resv.resolve("127.0.0.1", "6379")); - net::write(socket, buffer(p.payload)); + resp::write(socket, req); std::string buffer; for (;;) { - switch (p.events.front().second) { + switch (req.events.front().second) { case myevents::interesting1: { resp::response_list res; @@ -56,7 +56,7 @@ int main() resp::read(socket, buffer, res); } } - p.events.pop(); + req.events.pop(); } } catch (std::exception const& e) { std::cerr << e.what() << std::endl; diff --git a/examples/sync_responses.cpp b/examples/sync_responses.cpp index d88842d7..73eaef49 100644 --- a/examples/sync_responses.cpp +++ b/examples/sync_responses.cpp @@ -15,17 +15,17 @@ using namespace aedis; int main() { try { - resp::request p; - p.hello(); - p.rpush("list", {1, 2, 3}); - p.lrange("list"); - p.quit(); + resp::request req; + req.hello(); + req.rpush("list", {1, 2, 3}); + req.lrange("list"); + req.quit(); io_context ioc {1}; tcp::resolver resv(ioc); tcp::socket socket {ioc}; net::connect(socket, resv.resolve("127.0.0.1", "6379")); - net::write(socket, buffer(p.payload)); + resp::write(socket, req); std::string buffer; diff --git a/include/aedis/request.hpp b/include/aedis/request.hpp index f1aca977..1b90617d 100644 --- a/include/aedis/request.hpp +++ b/include/aedis/request.hpp @@ -15,6 +15,7 @@ #include #include +#include namespace aedis { namespace resp { diff --git a/include/aedis/write.hpp b/include/aedis/write.hpp index e69de29b..85d2944c 100644 --- a/include/aedis/write.hpp +++ b/include/aedis/write.hpp @@ -0,0 +1,73 @@ +/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#pragma once + +#include +#include + +#include + +namespace aedis { namespace resp { + +template< + class SyncWriteStream, + class Event> +std::size_t +write( + SyncWriteStream& stream, + request& req, + boost::system::error_code& ec) +{ + static_assert(boost::beast::is_sync_write_stream::value, + "SyncWriteStream type requirements not met"); + + return write(stream, net::buffer(req.payload), ec); +} + +template< + class SyncWriteStream, + class Event> +std::size_t +write( + SyncWriteStream& stream, + request& req) +{ + static_assert(boost::beast::is_sync_write_stream::value, + "SyncWriteStream type requirements not met"); + + boost::system::error_code ec; + auto const bytes_transferred = write(stream, req, ec); + + if (ec) + BOOST_THROW_EXCEPTION(boost::system::system_error{ec}); + + return bytes_transferred; +} + +template< + class AsyncWriteStream, + class Event, + class CompletionToken = + net::default_completion_token_t> +auto +async_write( + AsyncWriteStream& stream, + request& req, + CompletionToken&& token = + net::default_completion_token_t{}) +{ + static_assert(boost::beast::is_async_write_stream< + AsyncWriteStream>::value, + "AsyncWriteStream type requirements not met"); + + return net::async_write(stream, net::buffer(req.payload), token); +} + +} +} +