2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Merge pull request #151 from boostorg/150-remove-resp3read-and-resp3async_read

Removes resp3::async_read.
This commit is contained in:
Marcelo
2023-09-02 14:52:36 +02:00
committed by GitHub
18 changed files with 315 additions and 760 deletions

View File

@@ -162,7 +162,6 @@ if (BOOST_REDIS_TESTS)
make_test(test_conn_exec_error 17)
make_test(test_request 17)
make_test(test_run 17)
make_test(test_low_level_sync 17)
make_test(test_low_level_sync_sans_io 17)
make_test(test_conn_check_health 17)
@@ -172,7 +171,6 @@ if (BOOST_REDIS_TESTS)
make_test(test_conn_exec_cancel 20)
make_test(test_conn_exec_cancel2 20)
make_test(test_conn_echo_stress 20)
make_test(test_low_level_async 20)
make_test(test_conn_run_cancel 20)
make_test(test_issue_50 20)
endif()

View File

@@ -8,7 +8,7 @@
#include <boost/asio/detached.hpp>
#include <iostream>
namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::connection;
using boost::redis::request;
using boost::redis::response;
@@ -29,10 +29,10 @@ auto main(int argc, char * argv[]) -> int
response<std::string> resp;
net::io_context ioc;
asio::io_context ioc;
connection conn{ioc};
conn.async_run(cfg, {}, net::detached);
conn.async_run(cfg, {}, asio::detached);
conn.async_exec(req, resp, [&](auto ec, auto) {
if (!ec)

View File

@@ -9,7 +9,6 @@
#include <string>
#include <iostream>
namespace net = boost::asio;
using boost::redis::sync_connection;
using boost::redis::request;
using boost::redis::response;

View File

@@ -17,16 +17,23 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
#if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
namespace net = boost::asio;
using stream_descriptor = net::deferred_t::as_default_on_t<net::posix::stream_descriptor>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using boost::redis::request;
using boost::redis::generic_response;
namespace asio = boost::asio;
using stream_descriptor = asio::deferred_t::as_default_on_t<asio::posix::stream_descriptor>;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
using boost::asio::async_read_until;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::consign;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::redirect_error;
using boost::asio::use_awaitable;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::generic_response;
using boost::redis::ignore;
using net::redirect_error;
using net::use_awaitable;
using boost::redis::request;
using boost::system::error_code;
using namespace std::chrono_literals;
@@ -34,7 +41,7 @@ using namespace std::chrono_literals;
// terminals and type messages to stdin.
auto
receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
receiver(std::shared_ptr<connection> conn) -> awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");
@@ -45,7 +52,7 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req, ignore, net::deferred);
co_await conn->async_exec(req, ignore, deferred);
// Loop reading Redis push messages.
for (error_code ec;;) {
@@ -63,27 +70,27 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
}
// Publishes stdin messages to a Redis channel.
auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection> conn) -> net::awaitable<void>
auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection> conn) -> awaitable<void>
{
for (std::string msg;;) {
auto n = co_await net::async_read_until(*in, net::dynamic_buffer(msg, 1024), "\n");
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req, ignore, net::deferred);
co_await conn->async_exec(req, ignore, deferred);
msg.erase(0, n);
}
}
// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
auto stream = std::make_shared<stream_descriptor>(ex, ::dup(STDIN_FILENO));
net::co_spawn(ex, receiver(conn), net::detached);
net::co_spawn(ex, publisher(stream, conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
co_spawn(ex, receiver(conn), detached);
co_spawn(ex, publisher(stream, conn), detached);
conn->async_run(cfg, {}, consign(detached, conn));
signal_set sig_set{ex, SIGINT, SIGTERM};
co_await sig_set.async_wait();
@@ -92,7 +99,7 @@ auto co_main(config cfg) -> net::awaitable<void>
}
#else // defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
auto co_main(config const&) -> net::awaitable<void>
auto co_main(config const&) -> awaitable<void>
{
std::cout << "Requires support for posix streams." << std::endl;
co_return;

View File

@@ -14,13 +14,17 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
using boost::redis::ignore;
using boost::redis::config;
using boost::redis::connection;
using boost::asio::awaitable;
using boost::asio::deferred;
using boost::asio::detached;
using boost::asio::consign;
void print(std::map<std::string, std::string> const& cont)
{
@@ -35,7 +39,7 @@ void print(std::vector<int> const& cont)
}
// Stores the content of some STL containers in Redis.
auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto store(std::shared_ptr<connection> conn) -> awaitable<void>
{
std::vector<int> vec
{1, 2, 3, 4, 5, 6};
@@ -47,10 +51,10 @@ auto store(std::shared_ptr<connection> conn) -> net::awaitable<void>
req.push_range("RPUSH", "rpush-key", vec);
req.push_range("HSET", "hset-key", map);
co_await conn->async_exec(req, ignore, net::deferred);
co_await conn->async_exec(req, ignore, deferred);
}
auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto hgetall(std::shared_ptr<connection> conn) -> awaitable<void>
{
// A request contains multiple commands.
request req;
@@ -60,13 +64,13 @@ auto hgetall(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::map<std::string, std::string>> resp;
// Executes the request and reads the response.
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, deferred);
print(std::get<0>(resp).value());
}
// Retrieves in a transaction.
auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto transaction(std::shared_ptr<connection> conn) -> awaitable<void>
{
request req;
req.push("MULTI");
@@ -81,17 +85,17 @@ auto transaction(std::shared_ptr<connection> conn) -> net::awaitable<void>
response<std::optional<std::vector<int>>, std::optional<std::map<std::string, std::string>>> // exec
> resp;
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, deferred);
print(std::get<0>(std::get<3>(resp).value()).value().value());
print(std::get<1>(std::get<3>(resp).value()).value().value());
}
// Called from the main function (see main.cpp)
net::awaitable<void> co_main(config cfg)
awaitable<void> co_main(config cfg)
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
conn->async_run(cfg, {}, consign(detached, conn));
co_await store(conn);
co_await transaction(conn);

View File

@@ -14,10 +14,10 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
using tcp_socket = net::deferred_t::as_default_on_t<net::ip::tcp::socket>;
using tcp_acceptor = net::deferred_t::as_default_on_t<net::ip::tcp::acceptor>;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
namespace asio = boost::asio;
using tcp_socket = asio::deferred_t::as_default_on_t<asio::ip::tcp::socket>;
using tcp_acceptor = asio::deferred_t::as_default_on_t<asio::ip::tcp::acceptor>;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
@@ -25,16 +25,16 @@ using boost::system::error_code;
using boost::redis::connection;
using namespace std::chrono_literals;
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> net::awaitable<void>
auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
response<std::string> resp;
for (std::string buffer;;) {
auto n = co_await net::async_read_until(socket, net::dynamic_buffer(buffer, 1024), "\n");
auto n = co_await asio::async_read_until(socket, asio::dynamic_buffer(buffer, 1024), "\n");
req.push("PING", buffer);
co_await conn->async_exec(req, resp, net::deferred);
co_await net::async_write(socket, net::buffer(std::get<0>(resp).value()));
co_await conn->async_exec(req, resp, asio::deferred);
co_await asio::async_write(socket, asio::buffer(std::get<0>(resp).value()));
std::get<0>(resp).value().clear();
req.clear();
buffer.erase(0, n);
@@ -42,25 +42,25 @@ auto echo_server_session(tcp_socket socket, std::shared_ptr<connection> conn) ->
}
// Listens for tcp connections.
auto listener(std::shared_ptr<connection> conn) -> net::awaitable<void>
auto listener(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
try {
auto ex = co_await net::this_coro::executor;
tcp_acceptor acc(ex, {net::ip::tcp::v4(), 55555});
auto ex = co_await asio::this_coro::executor;
tcp_acceptor acc(ex, {asio::ip::tcp::v4(), 55555});
for (;;)
net::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), net::detached);
asio::co_spawn(ex, echo_server_session(co_await acc.async_accept(), conn), asio::detached);
} catch (std::exception const& e) {
std::clog << "Listener: " << e.what() << std::endl;
}
}
// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
net::co_spawn(ex, listener(conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
asio::co_spawn(ex, listener(conn), asio::detached);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
signal_set sig_set(ex, SIGINT, SIGTERM);
co_await sig_set.async_wait();

View File

@@ -13,17 +13,17 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::connection;
// Called from the main function (see main.cpp)
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
// A request containing only a ping command.
request req;
@@ -33,7 +33,7 @@ auto co_main(config cfg) -> net::awaitable<void>
response<std::string> resp;
// Executes the request.
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, asio::deferred);
conn->cancel();
std::cout << "PING: " << std::get<0>(resp).value() << std::endl;

View File

@@ -13,20 +13,20 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::config;
using boost::redis::logger;
using boost::redis::connection;
auto verify_certificate(bool, net::ssl::verify_context&) -> bool
auto verify_certificate(bool, asio::ssl::verify_context&) -> bool
{
std::cout << "set_verify_callback" << std::endl;
return true;
}
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
cfg.use_ssl = true;
cfg.username = "aedis";
@@ -34,18 +34,18 @@ auto co_main(config cfg) -> net::awaitable<void>
cfg.addr.host = "db.occase.de";
cfg.addr.port = "6380";
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
request req;
req.push("PING");
response<std::string> resp;
conn->next_layer().set_verify_mode(net::ssl::verify_peer);
conn->next_layer().set_verify_mode(asio::ssl::verify_peer);
conn->next_layer().set_verify_callback(verify_certificate);
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, asio::deferred);
conn->cancel();
std::cout << "Response: " << std::get<0>(resp).value() << std::endl;

View File

@@ -23,7 +23,7 @@
#include <boost/redis/resp3/serialization.hpp>
#include <boost/json/src.hpp>
namespace net = boost::asio;
namespace asio = boost::asio;
using namespace boost::describe;
using boost::redis::request;
using boost::redis::response;
@@ -48,11 +48,11 @@ void boost_redis_to_bulk(std::string& to, user const& u)
void boost_redis_from_bulk(user& u, std::string_view sv, boost::system::error_code&)
{ u = boost::json::value_to<user>(boost::json::parse(sv)); }
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
// user object that will be stored in Redis in json format.
user const u{"Joao", "58", "Brazil"};
@@ -64,7 +64,7 @@ auto co_main(config cfg) -> net::awaitable<void>
response<ignore_t, user> resp;
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, asio::deferred);
conn->cancel();
// Prints the first ping

View File

@@ -19,7 +19,7 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::request;
using boost::redis::response;
using boost::redis::operation;
@@ -58,11 +58,11 @@ void boost_redis_from_bulk(person& u, std::string_view sv, boost::system::error_
using tutorial::boost_redis_to_bulk;
using tutorial::boost_redis_from_bulk;
net::awaitable<void> co_main(config cfg)
asio::awaitable<void> co_main(config cfg)
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
person p;
p.set_name("Louis");
@@ -76,7 +76,7 @@ net::awaitable<void> co_main(config cfg)
response<ignore_t, person> resp;
// Sends the request and receives the response.
co_await conn->async_exec(req, resp, net::deferred);
co_await conn->async_exec(req, resp, asio::deferred);
conn->cancel();
std::cout

View File

@@ -12,8 +12,8 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
using endpoints = net::ip::tcp::resolver::results_type;
namespace asio = boost::asio;
using endpoints = asio::ip::tcp::resolver::results_type;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore_t;
@@ -22,18 +22,18 @@ using boost::redis::address;
using boost::redis::connection;
auto redir(boost::system::error_code& ec)
{ return net::redirect_error(net::use_awaitable, ec); }
{ return asio::redirect_error(asio::use_awaitable, ec); }
// For more info see
// - https://redis.io/docs/manual/sentinel.
// - https://redis.io/docs/reference/sentinel-clients.
auto resolve_master_address(std::vector<address> const& addresses) -> net::awaitable<address>
auto resolve_master_address(std::vector<address> const& addresses) -> asio::awaitable<address>
{
request req;
req.push("SENTINEL", "get-master-addr-by-name", "mymaster");
req.push("QUIT");
auto conn = std::make_shared<connection>(co_await net::this_coro::executor);
auto conn = std::make_shared<connection>(co_await asio::this_coro::executor);
response<std::optional<std::array<std::string, 2>>, ignore_t> resp;
for (auto addr : addresses) {
@@ -43,7 +43,7 @@ auto resolve_master_address(std::vector<address> const& addresses) -> net::await
// TODO: async_run and async_exec should be lauched in
// parallel here so we can wait for async_run completion
// before eventually calling it again.
conn->async_run(cfg, {}, net::consign(net::detached, conn));
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
co_await conn->async_exec(req, resp, redir(ec));
conn->cancel();
conn->reset_stream();
@@ -54,7 +54,7 @@ auto resolve_master_address(std::vector<address> const& addresses) -> net::await
co_return address{};
}
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
// A list of sentinel addresses from which only one is responsive.
// This simulates sentinels that are down.

View File

@@ -18,7 +18,7 @@
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_response;
@@ -27,7 +27,7 @@ using boost::redis::config;
using boost::redis::ignore;
using boost::system::error_code;
using boost::redis::connection;
using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
using signal_set = asio::deferred_t::as_default_on_t<asio::signal_set>;
/* This example will subscribe and read pushes indefinitely.
*
@@ -47,7 +47,7 @@ using signal_set = net::deferred_t::as_default_on_t<net::signal_set>;
// Receives server pushes.
auto
receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
{
request req;
req.push("SUBSCRIBE", "channel");
@@ -59,11 +59,11 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
while (conn->will_reconnect()) {
// Reconnect to channels.
co_await conn->async_exec(req, ignore, net::deferred);
co_await conn->async_exec(req, ignore, asio::deferred);
// Loop reading Redis pushs messages.
for (error_code ec;;) {
co_await conn->async_receive(net::redirect_error(net::use_awaitable, ec));
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
std::cout
@@ -76,12 +76,12 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
}
}
auto co_main(config cfg) -> net::awaitable<void>
auto co_main(config cfg) -> asio::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
auto ex = co_await asio::this_coro::executor;
auto conn = std::make_shared<connection>(ex);
net::co_spawn(ex, receiver(conn), net::detached);
conn->async_run(cfg, {}, net::consign(net::detached, conn));
asio::co_spawn(ex, receiver(conn), asio::detached);
conn->async_run(cfg, {}, asio::consign(asio::detached, conn));
signal_set sig_set(ex, SIGINT, SIGTERM);
co_await sig_set.async_wait();

View File

@@ -11,13 +11,13 @@
#include <boost/asio/io_context.hpp>
#include <iostream>
namespace net = boost::asio;
namespace asio = boost::asio;
using boost::redis::config;
using boost::redis::logger;
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
extern net::awaitable<void> co_main(config);
extern asio::awaitable<void> co_main(config);
auto main(int argc, char * argv[]) -> int
{
@@ -29,8 +29,8 @@ auto main(int argc, char * argv[]) -> int
cfg.addr.port = argv[2];
}
net::io_context ioc;
net::co_spawn(ioc, co_main(cfg), [](std::exception_ptr p) {
asio::io_context ioc;
asio::co_spawn(ioc, co_main(cfg), [](std::exception_ptr p) {
if (p)
std::rethrow_exception(p);
});

View File

@@ -9,7 +9,6 @@
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/detail/read.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/request.hpp>
@@ -44,6 +43,66 @@
namespace boost::redis::detail
{
template <class DynamicBuffer>
std::string_view buffer_view(DynamicBuffer buf) noexcept
{
char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
return std::string_view{start, std::size(buf)};
}
template <class AsyncReadStream, class DynamicBuffer>
class append_some_op {
private:
AsyncReadStream& stream_;
DynamicBuffer buf_;
std::size_t size_ = 0;
std::size_t tmp_ = 0;
asio::coroutine coro_{};
public:
append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
: stream_ {stream}
, buf_ {std::move(buf)}
, size_{size}
{ }
template <class Self>
void operator()( Self& self
, system::error_code ec = {}
, std::size_t n = 0)
{
BOOST_ASIO_CORO_REENTER (coro_)
{
tmp_ = buf_.size();
buf_.grow(size_);
BOOST_ASIO_CORO_YIELD
stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
buf_.shrink(buf_.size() - tmp_ - n);
self.complete({}, n);
}
}
};
template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
auto
async_append_some(
AsyncReadStream& stream,
DynamicBuffer buffer,
std::size_t size,
CompletionToken&& token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code, std::size_t)
>(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
}
template <class Conn>
struct exec_op {
using req_info_type = typename Conn::req_info;
@@ -128,9 +187,7 @@ struct run_op {
{
BOOST_ASIO_CORO_REENTER (coro)
{
conn->write_buffer_.clear();
conn->read_buffer_.clear();
conn->parser_.reset();
conn->reset();
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
@@ -331,8 +388,6 @@ public:
using clock_traits_type = asio::wait_traits<clock_type>;
using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
using this_type = connection_base<Executor>;
/// Constructs from an executor.
@@ -438,6 +493,7 @@ private:
using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
using runner_type = runner<executor_type>;
using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
auto use_ssl() const noexcept
{ return runner_.get_config().use_ssl;}
@@ -726,7 +782,6 @@ private:
void close()
{
if (stream_->next_layer().is_open()) {
// TODO: Communicate the error to the caller.
system::error_code ec;
stream_->next_layer().close(ec);
}
@@ -845,6 +900,14 @@ private:
return on_finish_parsing(parse_result::resp);
}
void reset()
{
write_buffer_.clear();
read_buffer_.clear();
parser_.reset();
on_push_ = false;
}
asio::ssl::context ctx_;
std::unique_ptr<next_layer_type> stream_;

View File

@@ -1,291 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#ifndef BOOST_REDIS_READ_HPP
#define BOOST_REDIS_READ_HPP
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/resp3/parser.hpp>
#include <boost/redis/adapter/ignore.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/compose.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/post.hpp>
#include <string_view>
#include <limits>
namespace boost::redis::detail {
template <class DynamicBuffer>
std::string_view buffer_view(DynamicBuffer buf) noexcept
{
char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
return std::string_view{start, std::size(buf)};
}
template <class AsyncReadStream, class DynamicBuffer>
class append_some_op {
private:
AsyncReadStream& stream_;
DynamicBuffer buf_;
std::size_t size_ = 0;
std::size_t tmp_ = 0;
asio::coroutine coro_{};
public:
append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
: stream_ {stream}
, buf_ {std::move(buf)}
, size_{size}
{ }
template <class Self>
void operator()( Self& self
, system::error_code ec = {}
, std::size_t n = 0)
{
BOOST_ASIO_CORO_REENTER (coro_)
{
tmp_ = buf_.size();
buf_.grow(size_);
BOOST_ASIO_CORO_YIELD
stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
buf_.shrink(buf_.size() - tmp_ - n);
self.complete({}, n);
}
}
};
template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
auto
async_append_some(
AsyncReadStream& stream,
DynamicBuffer buffer,
std::size_t size,
CompletionToken&& token)
{
return asio::async_compose
< CompletionToken
, void(system::error_code, std::size_t)
>(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
}
template <
class AsyncReadStream,
class DynamicBuffer,
class ResponseAdapter>
class parse_op {
private:
AsyncReadStream& stream_;
DynamicBuffer buf_;
resp3::parser parser_;
ResponseAdapter adapter_;
bool needs_rescheduling_ = true;
system::error_code ec_;
asio::coroutine coro_{};
static std::size_t const growth = 1024;
public:
parse_op(AsyncReadStream& stream, DynamicBuffer buf, ResponseAdapter adapter)
: stream_ {stream}
, buf_ {std::move(buf)}
, adapter_ {std::move(adapter)}
{ }
template <class Self>
void operator()( Self& self
, system::error_code ec = {}
, std::size_t = 0)
{
BOOST_ASIO_CORO_REENTER (coro_)
{
while (!resp3::parse(parser_, buffer_view(buf_), adapter_, ec)) {
needs_rescheduling_ = false;
BOOST_ASIO_CORO_YIELD
async_append_some(
stream_, buf_, parser_.get_suggested_buffer_growth(growth),
std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}
}
ec_ = ec;
if (needs_rescheduling_) {
BOOST_ASIO_CORO_YIELD
asio::post(std::move(self));
}
self.complete(ec_, parser_.get_consumed());
}
}
};
/** \brief Reads a complete response to a command sychronously.
*
* This function reads a complete response to a command or a
* server push synchronously. For example
*
* @code
* int resp;
* std::string buffer;
* resp3::read(socket, dynamic_buffer(buffer), adapt(resp));
* @endcode
*
* For a complete example see examples/intro_sync.cpp. This function
* is implemented in terms of one or more calls to @c
* asio::read_until and @c asio::read functions, and is known as a @a
* composed @a operation. Furthermore, the implementation may read
* additional bytes from the stream that lie past the end of the
* message being read. These additional bytes are stored in the
* dynamic buffer, which must be preserved for subsequent reads.
*
* \param stream The stream from which to read e.g. a tcp socket.
* \param buf Dynamic buffer (version 2).
* \param adapter The response adapter.
* \param ec If an error occurs, it will be assigned to this paramter.
* \returns The number of bytes that have been consumed from the dynamic buffer.
*
* \remark This function calls buf.consume() in each chunk of data
* after it has been passed to the adapter. Users must not consume
* the bytes after it returns.
*/
template <
class SyncReadStream,
class DynamicBuffer,
class ResponseAdapter
>
auto
read(
SyncReadStream& stream,
DynamicBuffer buf,
ResponseAdapter adapter,
system::error_code& ec) -> std::size_t
{
static std::size_t const growth = 1024;
resp3::parser parser;
while (!parser.done()) {
auto const res = parser.consume(detail::buffer_view(buf), ec);
if (ec)
return 0UL;
if (!res.has_value()) {
auto const size_before = buf.size();
buf.grow(parser.get_suggested_buffer_growth(growth));
auto const n =
stream.read_some(
buf.data(size_before, parser.get_suggested_buffer_growth(growth)),
ec);
if (ec)
return 0UL;
buf.shrink(buf.size() - size_before - n);
continue;
}
adapter(res.value(), ec);
if (ec)
return 0UL;
}
return parser.get_consumed();
}
/** \brief Reads a complete response to a command sychronously.
*
* Same as the error_code overload but throws on error.
*/
template<
class SyncReadStream,
class DynamicBuffer,
class ResponseAdapter = adapter::ignore>
auto
read(
SyncReadStream& stream,
DynamicBuffer buf,
ResponseAdapter adapter = ResponseAdapter{})
{
system::error_code ec;
auto const n = redis::detail::read(stream, buf, adapter, ec);
if (ec)
BOOST_THROW_EXCEPTION(system::system_error{ec});
return n;
}
/** \brief Reads a complete response to a Redis command asynchronously.
*
* This function reads a complete response to a command or a
* server push asynchronously. For example
*
* @code
* std::string buffer;
* std::set<std::string> resp;
* co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(resp));
* @endcode
*
* For a complete example see examples/transaction.cpp. This function
* is implemented in terms of one or more calls to @c
* asio::async_read_until and @c asio::async_read functions, and is
* known as a @a composed @a operation. Furthermore, the
* implementation may read additional bytes from the stream that lie
* past the end of the message being read. These additional bytes are
* stored in the dynamic buffer, which must be preserved for
* subsequent reads.
*
* \param stream The stream from which to read e.g. a tcp socket.
* \param buffer Dynamic buffer (version 2).
* \param adapter The response adapter.
* \param token The completion token.
*
* The completion handler will receive as a parameter the total
* number of bytes transferred from the stream and must have the
* following signature
*
* @code
* void(system::error_code, std::size_t);
* @endcode
*
* \remark This function calls buf.consume() in each chunk of data
* after it has been passed to the adapter. Users must not consume
* the bytes after it returns.
*/
template <
class AsyncReadStream,
class DynamicBuffer,
class ResponseAdapter = adapter::ignore,
class CompletionToken = asio::default_completion_token_t<typename AsyncReadStream::executor_type>
>
auto async_read(
AsyncReadStream& stream,
DynamicBuffer buffer,
ResponseAdapter adapter = ResponseAdapter{},
CompletionToken&& token =
asio::default_completion_token_t<typename AsyncReadStream::executor_type>{})
{
return asio::async_compose
< CompletionToken
, void(system::error_code, std::size_t)
>(parse_op<AsyncReadStream, DynamicBuffer, ResponseAdapter> {stream, buffer, adapter},
token,
stream);
}
} // boost::redis::detail
#endif // BOOST_REDIS_READ_HPP

View File

@@ -7,16 +7,11 @@
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/detail/read.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/beast/_experimental/test/stream.hpp>
#include <boost/redis/resp3/parser.hpp>
#define BOOST_TEST_MODULE low level
#include <boost/test/included/unit_test.hpp>
#include <map>
#include <iostream>
#include <optional>
@@ -30,16 +25,17 @@ auto operator==(boost::redis::ignore_t, boost::redis::ignore_t) noexcept {return
auto operator!=(boost::redis::ignore_t, boost::redis::ignore_t) noexcept {return false;}
}
namespace net = boost::asio;
namespace redis = boost::redis;
namespace resp3 = boost::redis::resp3;
using boost::system::error_code;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::adapter::result;
using boost::redis::resp3::parser;
using boost::redis::resp3::parse;
using test_stream = boost::beast::test::stream;
using boost::redis::adapter::adapt2;
using node_type = result<resp3::node>;
using vec_node_type = result<std::vector<resp3::node>>;
@@ -82,95 +78,51 @@ template <class Result>
struct expect {
std::string in;
Result expected;
boost::system::error_code ec{};
error_code ec{};
resp3::type error_type = resp3::type::invalid;
};
template <class Result>
auto make_expected(std::string in, Result expected, boost::system::error_code ec = {}, resp3::type error_type = resp3::type::invalid)
auto make_expected(std::string in, Result expected, error_code ec = {}, resp3::type error_type = resp3::type::invalid)
{
return expect<Result>{in, expected, ec, error_type};
}
template <class Result>
void test_sync(net::any_io_executor ex, expect<Result> e)
void test_sync(expect<Result> e)
{
std::string rbuffer;
test_stream ts {ex};
ts.append(e.in);
parser p;
Result result;
boost::system::error_code ec;
auto dbuf = net::dynamic_buffer(rbuffer);
auto const consumed = redis::detail::read(ts, dbuf, adapt2(result), ec);
if (e.ec) {
auto adapter = adapt2(result);
error_code ec;
auto const res = parse(p, e.in, adapter, ec);
BOOST_TEST(res); // None of these tests need more data.
if (ec) {
BOOST_CHECK_EQUAL(ec, e.ec);
return;
}
dbuf.consume(consumed);
BOOST_TEST(!ec);
BOOST_TEST(rbuffer.empty());
if (result.has_value()) {
auto const res = result == e.expected;
BOOST_TEST(res);
BOOST_TEST(bool(result == e.expected));
BOOST_CHECK_EQUAL(e.in.size(), p.get_consumed());
} else {
BOOST_TEST(result.has_error());
BOOST_CHECK_EQUAL(result.error().data_type, e.error_type);
}
}
template <class Result>
class async_test: public std::enable_shared_from_this<async_test<Result>> {
private:
std::string rbuffer_;
test_stream ts_;
expect<Result> data_;
Result result_;
public:
async_test(net::any_io_executor ex, expect<Result> e)
: ts_{ex}
, data_{e}
{
ts_.append(e.in);
}
void run()
{
auto self = this->shared_from_this();
auto f = [self](auto ec, auto)
{
if (self->data_.ec) {
BOOST_CHECK_EQUAL(ec, self->data_.ec);
return;
}
BOOST_TEST(!ec);
//BOOST_TEST(self->rbuffer_.empty());
if (self->result_.has_value()) {
auto const res = self->result_ == self->data_.expected;
BOOST_TEST(res);
} else {
BOOST_TEST(self->result_.has_error());
BOOST_CHECK_EQUAL(self->result_.error().data_type, self->data_.error_type);
}
};
redis::detail::async_read(
ts_,
net::dynamic_buffer(rbuffer_),
adapt2(result_),
f);
}
};
template <class Result>
void test_async(net::any_io_executor ex, expect<Result> e)
void test_sync2(expect<Result> e)
{
std::make_shared<async_test<Result>>(ex, e)->run();
parser p;
Result result;
auto adapter = adapt2(result);
error_code ec;
auto const res = parse(p, e.in, adapter, ec);
BOOST_TEST(res); // None of these tests need more data.
BOOST_CHECK_EQUAL(ec, e.ec);
}
auto make_blob()
@@ -397,173 +349,136 @@ vec_node_type const attr_e1b
#define S18d "$0\r\n\r\n"
#define NUMBER_TEST_CONDITIONS(test) \
test(ex, make_expected(S01a, result<std::optional<bool>>{}, boost::redis::error::unexpected_bool_value)); \
test(ex, make_expected(S01b, result<bool>{{false}})); \
test(ex, make_expected(S01b, node_type{{resp3::type::boolean, 1UL, 0UL, {"f"}}})); \
test(ex, make_expected(S01c, result<bool>{{true}})); \
test(ex, make_expected(S01c, node_type{{resp3::type::boolean, 1UL, 0UL, {"t"}}})); \
test(ex, make_expected(S01c, op_bool_ok)); \
test(ex, make_expected(S01c, result<std::map<int, int>>{}, boost::redis::error::expects_resp3_map)); \
test(ex, make_expected(S01c, result<std::set<int>>{}, boost::redis::error::expects_resp3_set)); \
test(ex, make_expected(S01c, result<std::unordered_map<int, int>>{}, boost::redis::error::expects_resp3_map)); \
test(ex, make_expected(S01c, result<std::unordered_set<int>>{}, boost::redis::error::expects_resp3_set)); \
test(ex, make_expected(S02a, streamed_string_e2)); \
test(ex, make_expected(S03a, result<int>{}, boost::redis::error::expects_resp3_simple_type));\
test(ex, make_expected(S03a, result<std::optional<int>>{}, boost::redis::error::expects_resp3_simple_type));; \
test(ex, make_expected(S02b, result<int>{}, boost::redis::error::not_a_number)); \
test(ex, make_expected(S02b, result<std::string>{std::string{"Hello word"}})); \
test(ex, make_expected(S02b, streamed_string_e1)); \
test(ex, make_expected(S02c, result<std::string>{}, boost::redis::error::not_a_number)); \
test(ex, make_expected(S05a, node_type{{resp3::type::number, 1UL, 0UL, {"-3"}}})); \
test(ex, make_expected(S05b, result<int>{11})); \
test(ex, make_expected(S05b, op_int_ok)); \
test(ex, make_expected(S05b, result<std::list<std::string>>{}, boost::redis::error::expects_resp3_aggregate)); \
test(ex, make_expected(S05b, result<std::map<std::string, std::string>>{}, boost::redis::error::expects_resp3_map)); \
test(ex, make_expected(S05b, result<std::set<std::string>>{}, boost::redis::error::expects_resp3_set)); \
test(ex, make_expected(S05b, result<std::unordered_map<std::string, std::string>>{}, boost::redis::error::expects_resp3_map)); \
test(ex, make_expected(S05b, result<std::unordered_set<std::string>>{}, boost::redis::error::expects_resp3_set)); \
test(ex, make_expected(s05c, array_type2{}, boost::redis::error::expects_resp3_aggregate));\
test(ex, make_expected(s05c, node_type{{resp3::type::number, 1UL, 0UL, {"3"}}}));\
test(ex, make_expected(S06a, op_type_01{})); \
test(ex, make_expected(S06a, op_type_02{}));\
test(ex, make_expected(S06a, op_type_03{}));\
test(ex, make_expected(S06a, op_type_04{}));\
test(ex, make_expected(S06a, op_type_05{}));\
test(ex, make_expected(S06a, op_type_06{}));\
test(ex, make_expected(S06a, op_type_07{}));\
test(ex, make_expected(S06a, op_type_08{}));\
test(ex, make_expected(S06a, op_type_09{}));\
test(ex, make_expected(S07a, push_e1a)); \
test(ex, make_expected(S07b, push_e1b)); \
test(ex, make_expected(S04b, map_type{}, boost::redis::error::expects_resp3_map));\
test(ex, make_expected(S03b, map_e1f));\
test(ex, make_expected(S03b, map_e1g));\
test(ex, make_expected(S03b, map_e1k));\
test(ex, make_expected(S03b, map_expected_1a));\
test(ex, make_expected(S03b, map_expected_1b));\
test(ex, make_expected(S03b, map_expected_1c));\
test(ex, make_expected(S03b, map_expected_1d));\
test(ex, make_expected(S03b, map_expected_1e));\
test(ex, make_expected(S08a, attr_e1a)); \
test(ex, make_expected(S08b, attr_e1b)); \
test(ex, make_expected(S04e, array_e1a));\
test(ex, make_expected(S04e, array_e1b));\
test(ex, make_expected(S04e, array_e1c));\
test(ex, make_expected(S04e, array_e1f));\
test(ex, make_expected(S04e, array_e1g));\
test(ex, make_expected(S04e, array_e1h));\
test(ex, make_expected(S04e, array_type2{}, boost::redis::error::incompatible_size));\
test(ex, make_expected(S04e, tuple_int_2{}, boost::redis::error::incompatible_size));\
test(ex, make_expected(S04f, array_type2{}, boost::redis::error::nested_aggregate_not_supported));\
test(ex, make_expected(S04g, vec_node_type{}, boost::redis::error::exceeeds_max_nested_depth));\
test(ex, make_expected(S04h, array_e1d));\
test(ex, make_expected(S04h, array_e1e));\
test(ex, make_expected(S04i, set_type{}, boost::redis::error::expects_resp3_set)); \
test(ex, make_expected(S09a, set_e1c)); \
test(ex, make_expected(S09a, set_e1d)); \
test(ex, make_expected(S09a, set_e1f)); \
test(ex, make_expected(S09a, set_e1g)); \
test(ex, make_expected(S09a, set_expected1a)); \
test(ex, make_expected(S09a, set_expected_1e)); \
test(ex, make_expected(S09a, set_type{{"apple", "one", "orange", "three", "two"}})); \
test(ex, make_expected(S09b, vec_node_type{{{resp3::type::set, 0UL, 0UL, {}}}})); \
test(ex, make_expected(S03c, map_type{}));\
test(ex, make_expected(S11a, node_type{{resp3::type::doublean, 1UL, 0UL, {"1.23"}}}));\
test(ex, make_expected(S11b, node_type{{resp3::type::doublean, 1UL, 0UL, {"inf"}}}));\
test(ex, make_expected(S11c, node_type{{resp3::type::doublean, 1UL, 0UL, {"-inf"}}}));\
test(ex, make_expected(S11d, result<double>{{1.23}}));\
test(ex, make_expected(S11e, result<double>{{0}}, boost::redis::error::not_a_double));\
test(ex, make_expected(S13a, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {"txt:Some string"}}}));\
test(ex, make_expected(S13b, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {}}}));\
test(ex, make_expected(S14a, node_type{{resp3::type::big_number, 1UL, 0UL, {"3492890328409238509324850943850943825024385"}}}));\
test(ex, make_expected(S14b, result<int>{}, boost::redis::error::empty_field));\
test(ex, make_expected(S15a, result<std::optional<std::string>>{{"OK"}}));\
test(ex, make_expected(S15a, result<std::string>{{"OK"}}));\
test(ex, make_expected(S15b, result<std::optional<std::string>>{""}));\
test(ex, make_expected(S15b, result<std::string>{{""}}));\
test(ex, make_expected(S16a, result<int>{}, boost::redis::error::invalid_data_type));\
test(ex, make_expected(S05d, result<int>{11}, boost::redis::error::not_a_number));\
test(ex, make_expected(S03d, map_type{}, boost::redis::error::not_a_number));\
test(ex, make_expected(S02d, result<std::string>{}, boost::redis::error::not_a_number));\
test(ex, make_expected(S17a, result<std::string>{}, boost::redis::error::not_a_number));\
test(ex, make_expected(S05e, result<int>{}, boost::redis::error::empty_field));\
test(ex, make_expected(S01d, result<std::optional<bool>>{}, boost::redis::error::empty_field));\
test(ex, make_expected(S11f, result<std::string>{}, boost::redis::error::empty_field));\
test(ex, make_expected(S17b, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hh"}}}));\
test(ex, make_expected(S18c, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}}}));\
test(ex, make_expected(S18d, node_type{{resp3::type::blob_string, 1UL, 0UL, {}}}));\
test(ex, make_expected(make_blob_string(blob), node_type{{resp3::type::blob_string, 1UL, 0UL, {blob}}}));\
test(ex, make_expected(S04a, result<std::vector<int>>{{11}})); \
test(ex, make_expected(S04d, result<response<std::unordered_set<std::string>>>{response<std::unordered_set<std::string>>{{set_e1c}}})); \
test(ex, make_expected(S04c, result<response<std::map<std::string, std::string>>>{response<std::map<std::string, std::string>>{{map_expected_1b}}}));\
test(ex, make_expected(S03b, map_e1l));\
test(ex, make_expected(S06a, result<int>{0}, {}, resp3::type::null)); \
test(ex, make_expected(S06a, map_type{}, {}, resp3::type::null));\
test(ex, make_expected(S06a, array_type{}, {}, resp3::type::null));\
test(ex, make_expected(S06a, result<std::list<int>>{}, {}, resp3::type::null));\
test(ex, make_expected(S06a, result<std::vector<int>>{}, {}, resp3::type::null));\
test(ex, make_expected(S10a, result<ignore_t>{}, boost::redis::error::resp3_simple_error)); \
test(ex, make_expected(S10a, node_type{{resp3::type::simple_error, 1UL, 0UL, {"Error"}}}, {}, resp3::type::simple_error)); \
test(ex, make_expected(S10b, node_type{{resp3::type::simple_error, 1UL, 0UL, {""}}}, {}, resp3::type::simple_error)); \
test(ex, make_expected(S12a, node_type{{resp3::type::blob_error, 1UL, 0UL, {"SYNTAX invalid syntax"}}}, {}, resp3::type::blob_error));\
test(ex, make_expected(S12b, node_type{{resp3::type::blob_error, 1UL, 0UL, {}}}, {}, resp3::type::blob_error));\
test(ex, make_expected(S12c, result<ignore_t>{}, boost::redis::error::resp3_blob_error));\
test(make_expected(S01a, result<std::optional<bool>>{}, boost::redis::error::unexpected_bool_value)); \
test(make_expected(S01b, result<bool>{{false}})); \
test(make_expected(S01b, node_type{{resp3::type::boolean, 1UL, 0UL, {"f"}}})); \
test(make_expected(S01c, result<bool>{{true}})); \
test(make_expected(S01c, node_type{{resp3::type::boolean, 1UL, 0UL, {"t"}}})); \
test(make_expected(S01c, op_bool_ok)); \
test(make_expected(S01c, result<std::map<int, int>>{}, boost::redis::error::expects_resp3_map)); \
test(make_expected(S01c, result<std::set<int>>{}, boost::redis::error::expects_resp3_set)); \
test(make_expected(S01c, result<std::unordered_map<int, int>>{}, boost::redis::error::expects_resp3_map)); \
test(make_expected(S01c, result<std::unordered_set<int>>{}, boost::redis::error::expects_resp3_set)); \
test(make_expected(S02a, streamed_string_e2)); \
test(make_expected(S03a, result<int>{}, boost::redis::error::expects_resp3_simple_type));\
test(make_expected(S03a, result<std::optional<int>>{}, boost::redis::error::expects_resp3_simple_type));; \
test(make_expected(S02b, result<int>{}, boost::redis::error::not_a_number)); \
test(make_expected(S02b, result<std::string>{std::string{"Hello word"}})); \
test(make_expected(S02b, streamed_string_e1)); \
test(make_expected(S02c, result<std::string>{}, boost::redis::error::not_a_number)); \
test(make_expected(S05a, node_type{{resp3::type::number, 1UL, 0UL, {"-3"}}})); \
test(make_expected(S05b, result<int>{11})); \
test(make_expected(S05b, op_int_ok)); \
test(make_expected(S05b, result<std::list<std::string>>{}, boost::redis::error::expects_resp3_aggregate)); \
test(make_expected(S05b, result<std::map<std::string, std::string>>{}, boost::redis::error::expects_resp3_map)); \
test(make_expected(S05b, result<std::set<std::string>>{}, boost::redis::error::expects_resp3_set)); \
test(make_expected(S05b, result<std::unordered_map<std::string, std::string>>{}, boost::redis::error::expects_resp3_map)); \
test(make_expected(S05b, result<std::unordered_set<std::string>>{}, boost::redis::error::expects_resp3_set)); \
test(make_expected(s05c, array_type2{}, boost::redis::error::expects_resp3_aggregate));\
test(make_expected(s05c, node_type{{resp3::type::number, 1UL, 0UL, {"3"}}}));\
test(make_expected(S06a, op_type_01{})); \
test(make_expected(S06a, op_type_02{}));\
test(make_expected(S06a, op_type_03{}));\
test(make_expected(S06a, op_type_04{}));\
test(make_expected(S06a, op_type_05{}));\
test(make_expected(S06a, op_type_06{}));\
test(make_expected(S06a, op_type_07{}));\
test(make_expected(S06a, op_type_08{}));\
test(make_expected(S06a, op_type_09{}));\
test(make_expected(S07a, push_e1a)); \
test(make_expected(S07b, push_e1b)); \
test(make_expected(S04b, map_type{}, boost::redis::error::expects_resp3_map));\
test(make_expected(S03b, map_e1f));\
test(make_expected(S03b, map_e1g));\
test(make_expected(S03b, map_e1k));\
test(make_expected(S03b, map_expected_1a));\
test(make_expected(S03b, map_expected_1b));\
test(make_expected(S03b, map_expected_1c));\
test(make_expected(S03b, map_expected_1d));\
test(make_expected(S03b, map_expected_1e));\
test(make_expected(S08a, attr_e1a)); \
test(make_expected(S08b, attr_e1b)); \
test(make_expected(S04e, array_e1a));\
test(make_expected(S04e, array_e1b));\
test(make_expected(S04e, array_e1c));\
test(make_expected(S04e, array_e1f));\
test(make_expected(S04e, array_e1g));\
test(make_expected(S04e, array_e1h));\
test(make_expected(S04e, array_type2{}, boost::redis::error::incompatible_size));\
test(make_expected(S04e, tuple_int_2{}, boost::redis::error::incompatible_size));\
test(make_expected(S04f, array_type2{}, boost::redis::error::nested_aggregate_not_supported));\
test(make_expected(S04g, vec_node_type{}, boost::redis::error::exceeeds_max_nested_depth));\
test(make_expected(S04h, array_e1d));\
test(make_expected(S04h, array_e1e));\
test(make_expected(S04i, set_type{}, boost::redis::error::expects_resp3_set)); \
test(make_expected(S09a, set_e1c)); \
test(make_expected(S09a, set_e1d)); \
test(make_expected(S09a, set_e1f)); \
test(make_expected(S09a, set_e1g)); \
test(make_expected(S09a, set_expected1a)); \
test(make_expected(S09a, set_expected_1e)); \
test(make_expected(S09a, set_type{{"apple", "one", "orange", "three", "two"}})); \
test(make_expected(S09b, vec_node_type{{{resp3::type::set, 0UL, 0UL, {}}}})); \
test(make_expected(S03c, map_type{}));\
test(make_expected(S11a, node_type{{resp3::type::doublean, 1UL, 0UL, {"1.23"}}}));\
test(make_expected(S11b, node_type{{resp3::type::doublean, 1UL, 0UL, {"inf"}}}));\
test(make_expected(S11c, node_type{{resp3::type::doublean, 1UL, 0UL, {"-inf"}}}));\
test(make_expected(S11d, result<double>{{1.23}}));\
test(make_expected(S11e, result<double>{{0}}, boost::redis::error::not_a_double));\
test(make_expected(S13a, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {"txt:Some string"}}}));\
test(make_expected(S13b, node_type{{resp3::type::verbatim_string, 1UL, 0UL, {}}}));\
test(make_expected(S14a, node_type{{resp3::type::big_number, 1UL, 0UL, {"3492890328409238509324850943850943825024385"}}}));\
test(make_expected(S14b, result<int>{}, boost::redis::error::empty_field));\
test(make_expected(S15a, result<std::optional<std::string>>{{"OK"}}));\
test(make_expected(S15a, result<std::string>{{"OK"}}));\
test(make_expected(S15b, result<std::optional<std::string>>{""}));\
test(make_expected(S15b, result<std::string>{{""}}));\
test(make_expected(S16a, result<int>{}, boost::redis::error::invalid_data_type));\
test(make_expected(S05d, result<int>{11}, boost::redis::error::not_a_number));\
test(make_expected(S03d, map_type{}, boost::redis::error::not_a_number));\
test(make_expected(S02d, result<std::string>{}, boost::redis::error::not_a_number));\
test(make_expected(S17a, result<std::string>{}, boost::redis::error::not_a_number));\
test(make_expected(S05e, result<int>{}, boost::redis::error::empty_field));\
test(make_expected(S01d, result<std::optional<bool>>{}, boost::redis::error::empty_field));\
test(make_expected(S11f, result<std::string>{}, boost::redis::error::empty_field));\
test(make_expected(S17b, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hh"}}}));\
test(make_expected(S18c, node_type{{resp3::type::blob_string, 1UL, 0UL, {"hhaa\aaaa\raaaaa\r\naaaaaaaaaa"}}}));\
test(make_expected(S18d, node_type{{resp3::type::blob_string, 1UL, 0UL, {}}}));\
test(make_expected(make_blob_string(blob), node_type{{resp3::type::blob_string, 1UL, 0UL, {blob}}}));\
test(make_expected(S04a, result<std::vector<int>>{{11}})); \
test(make_expected(S04d, result<response<std::unordered_set<std::string>>>{response<std::unordered_set<std::string>>{{set_e1c}}})); \
test(make_expected(S04c, result<response<std::map<std::string, std::string>>>{response<std::map<std::string, std::string>>{{map_expected_1b}}}));\
test(make_expected(S03b, map_e1l));\
test(make_expected(S06a, result<int>{0}, {}, resp3::type::null)); \
test(make_expected(S06a, map_type{}, {}, resp3::type::null));\
test(make_expected(S06a, array_type{}, {}, resp3::type::null));\
test(make_expected(S06a, result<std::list<int>>{}, {}, resp3::type::null));\
test(make_expected(S06a, result<std::vector<int>>{}, {}, resp3::type::null));\
test(make_expected(S10a, result<ignore_t>{}, boost::redis::error::resp3_simple_error)); \
test(make_expected(S10a, node_type{{resp3::type::simple_error, 1UL, 0UL, {"Error"}}}, {}, resp3::type::simple_error)); \
test(make_expected(S10b, node_type{{resp3::type::simple_error, 1UL, 0UL, {""}}}, {}, resp3::type::simple_error)); \
test(make_expected(S12a, node_type{{resp3::type::blob_error, 1UL, 0UL, {"SYNTAX invalid syntax"}}}, {}, resp3::type::blob_error));\
test(make_expected(S12b, node_type{{resp3::type::blob_error, 1UL, 0UL, {}}}, {}, resp3::type::blob_error));\
test(make_expected(S12c, result<ignore_t>{}, boost::redis::error::resp3_blob_error));\
BOOST_AUTO_TEST_CASE(parser)
BOOST_AUTO_TEST_CASE(sansio)
{
net::io_context ioc;
auto ex = ioc.get_executor();
#define TEST test_sync
NUMBER_TEST_CONDITIONS(TEST)
#undef TEST
#define TEST test_async
NUMBER_TEST_CONDITIONS(TEST)
#undef TEST
ioc.run();
NUMBER_TEST_CONDITIONS(test_sync)
}
BOOST_AUTO_TEST_CASE(ignore_adapter_simple_error)
{
net::io_context ioc;
std::string rbuffer;
boost::system::error_code ec;
test_stream ts {ioc};
ts.append(S10a);
redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec);
BOOST_CHECK_EQUAL(ec, boost::redis::error::resp3_simple_error);
BOOST_TEST(!rbuffer.empty());
test_sync2(make_expected(S10a, ignore, boost::redis::error::resp3_simple_error));
}
BOOST_AUTO_TEST_CASE(ignore_adapter_blob_error)
{
net::io_context ioc;
std::string rbuffer;
boost::system::error_code ec;
test_stream ts {ioc};
ts.append(S12a);
redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec);
BOOST_CHECK_EQUAL(ec, boost::redis::error::resp3_blob_error);
BOOST_TEST(!rbuffer.empty());
test_sync2(make_expected(S12a, ignore, boost::redis::error::resp3_blob_error));
}
BOOST_AUTO_TEST_CASE(ignore_adapter_no_error)
{
net::io_context ioc;
std::string rbuffer;
boost::system::error_code ec;
test_stream ts {ioc};
ts.append(S05b);
auto const consumed = redis::detail::read(ts, net::dynamic_buffer(rbuffer), adapt2(ignore), ec);
BOOST_TEST(!ec);
BOOST_CHECK_EQUAL(rbuffer.size(), consumed);
test_sync2(make_expected(S05b, ignore));
}
//-----------------------------------------------------------------------------------
@@ -660,7 +575,7 @@ BOOST_AUTO_TEST_CASE(adapter)
using boost::redis::adapter::boost_redis_adapt;
using resp3::type;
boost::system::error_code ec;
error_code ec;
response<std::string, int, ignore_t> resp;
@@ -674,4 +589,3 @@ BOOST_AUTO_TEST_CASE(adapter)
BOOST_CHECK_EQUAL(std::get<1>(resp).value(), 42);
BOOST_TEST(!ec);
}

View File

@@ -1,78 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/detail/read.hpp>
#include <boost/redis/detail/write.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <string>
#include <iostream>
#define BOOST_TEST_MODULE conn-tls
#include <boost/test/included/unit_test.hpp>
#if defined(BOOST_ASIO_HAS_CO_AWAIT)
namespace net = boost::asio;
namespace redis = boost::redis;
using resolver = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::resolver>;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
using boost::redis::adapter::adapt2;
using net::ip::tcp;
using boost::redis::request;
using boost::redis::adapter::result;
using redis::config;
auto co_main(config cfg) -> net::awaitable<void>
{
auto ex = co_await net::this_coro::executor;
resolver resv{ex};
auto const addrs = co_await resv.async_resolve(cfg.addr.host, cfg.addr.port);
tcp_socket socket{ex};
co_await net::async_connect(socket, addrs);
// Creates the request and writes to the socket.
request req;
req.push("HELLO", 3);
req.push("PING", "Hello world");
req.push("QUIT");
co_await redis::detail::async_write(socket, req);
// Responses
std::string buffer;
result<std::string> resp;
std::size_t consumed = 0;
// Reads the responses to all commands in the request.
auto dbuf = net::dynamic_buffer(buffer);
consumed = co_await redis::detail::async_read(socket, dbuf);
dbuf.consume(consumed);
consumed = co_await redis::detail::async_read(socket, dbuf, adapt2(resp));
dbuf.consume(consumed);
consumed = co_await redis::detail::async_read(socket, dbuf);
dbuf.consume(consumed);
std::cout << "Ping: " << resp.value() << std::endl;
}
BOOST_AUTO_TEST_CASE(low_level_async)
{
net::io_context ioc;
net::co_spawn(ioc, co_main({}), net::detached);
ioc.run();
}
#else // defined(BOOST_ASIO_HAS_CO_AWAIT)
BOOST_AUTO_TEST_CASE(low_level_async)
{
}
#endif // defined(BOOST_ASIO_HAS_CO_AWAIT)

View File

@@ -1,61 +0,0 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/detail/read.hpp>
#include <boost/redis/detail/write.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#define BOOST_TEST_MODULE conn-quit
#include <boost/test/included/unit_test.hpp>
#include <string>
#include <iostream>
namespace net = boost::asio;
namespace redis = boost::redis;
using boost::redis::adapter::adapt2;
using boost::redis::request;
using boost::redis::adapter::result;
BOOST_AUTO_TEST_CASE(low_level_sync)
{
try {
std::string const host = "127.0.0.1";
std::string const port = "6379";
net::io_context ioc;
net::ip::tcp::resolver resv{ioc};
auto const res = resv.resolve(host, port);
net::ip::tcp::socket socket{ioc};
net::connect(socket, res);
// Creates the request and writes to the socket.
request req;
req.push("HELLO", 3);
req.push("PING", "Hello world");
req.push("QUIT");
redis::detail::write(socket, req);
std::string buffer;
result<std::string> resp;
std::size_t consumed = 0;
// Reads the responses to all commands in the request.
auto dbuf = net::dynamic_buffer(buffer);
consumed = redis::detail::read(socket, dbuf);
dbuf.consume(consumed);
consumed = redis::detail::read(socket, dbuf, adapt2(resp));
dbuf.consume(consumed);
consumed = redis::detail::read(socket, dbuf);
dbuf.consume(consumed);
std::cout << "Ping: " << resp.value() << std::endl;
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
exit(EXIT_FAILURE);
}
}