2
0
mirror of https://github.com/boostorg/redis.git synced 2026-02-17 14:02:13 +00:00

Further progresses with response types.

This commit is contained in:
Marcelo Zimbres
2021-01-31 16:12:16 +01:00
parent 6052773347
commit 4ffeda5e3e
6 changed files with 248 additions and 134 deletions

View File

@@ -1,62 +1,61 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
#include <aedis/utils.hpp>
#include <aedis/receiver_print.hpp>
#include <stack>
namespace net = aedis::net;
namespace net = aedis::net;
using tcp = net::ip::tcp;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<tcp::socket>;
namespace this_coro = net::this_coro;
using namespace net;
using namespace aedis;
using tcp = net::ip::tcp;
void fill(resp::request<resp::event>& req)
void fill1(resp::request<resp::event>& req)
{
req.multi();
req.hello();
req.ping();
//req.multi();
req.rpush("list", {1, 2, 3});
req.lrange("list");
req.exec();
req.quit();
//req.exec();
req.ping();
}
net::awaitable<void> example()
{
try {
auto ex = co_await this_coro::executor;
auto ex = co_await net::this_coro::executor;
tcp::resolver resv(ex);
tcp_socket socket {ex};
co_await net::async_connect(socket, resv.resolve("127.0.0.1", "6379"));
auto const r = resv.resolve("127.0.0.1", "6379");
tcp::socket socket {ex};
co_await async_connect(socket, r, net::use_awaitable);
resp::request req;
fill(req);
co_await net::async_write(socket, net::buffer(req.payload));
auto reqs = resp::make_request_queue<resp::event>();
resp::response_buffers resps;
resp::receiver_print recv{resps};
net::steady_timer st{ex};
co_spawn(ex, resp::async_reader(socket, reqs, resps, recv), net::detached);
resp::async_writer(socket, reqs, st, net::detached);
queue_writer(reqs, fill1, st);
net::steady_timer timer(ex, std::chrono::years{1});
co_await timer.async_wait(net::use_awaitable);
std::string buffer;
for (;;) {
resp::response_array<std::string> hello;
co_await resp::async_read(socket, buffer, hello);
resp::print(hello.result);
}
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
std::cout << e.what() << std::endl;
}
}
int main()
{
io_context ioc {1};
co_spawn(ioc, example(), detached);
net::io_context ioc {1};
co_spawn(ioc, example(), net::detached);
ioc.run();
}

View File

@@ -7,6 +7,7 @@
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
#include <aedis/receiver_print.hpp>
#include <stack>
@@ -37,7 +38,7 @@ operator<<(std::ostream& os, myevent e)
return os;
}
auto fill_req(resp::request<myevent>& req)
auto filler(resp::request<myevent>& req)
{
//req.subscribe("channel");
//req.subscribe("__keyspace@0__:user:*");
@@ -68,10 +69,19 @@ auto fill_req(resp::request<myevent>& req)
//req.del("eee");
}
void fill1(resp::request<myevent>& req)
{
req.multi();
req.rpush("list", {1, 2, 3});
req.lrange("list");
req.exec();
req.ping();
}
net::awaitable<void> subscriber()
{
auto ex = co_await net::this_coro::executor;
try {
auto ex = co_await net::this_coro::executor;
tcp::resolver resv(ex);
auto const r = resv.resolve("127.0.0.1", "6379");
tcp::socket socket {ex};
@@ -84,15 +94,13 @@ net::awaitable<void> subscriber()
co_spawn(
ex,
resp::async_reader(socket, reqs, recv, resps),
resp::async_reader(socket, reqs, resps, recv),
net::detached);
resp::async_writer(socket, reqs, st, net::detached);
auto filler = [](auto& req){fill_req(req);};
for (;;) {
queue_writer(reqs, filler, st);
queue_writer(reqs, fill1, st);
net::steady_timer timer(ex, std::chrono::milliseconds{1000});
co_await timer.async_wait(net::use_awaitable);
}

View File

@@ -253,17 +253,24 @@ auto async_read_type(
stream);
}
struct receiver_ignore {
template <class Event>
void receive(response_id<Event> const&) {}
template <class Event>
void receive_transaction(std::queue<response_id<Event>>) {}
};
template <
class AsyncReadWriteStream,
class Receiver,
class Event,
class ResponseBuffer>
class ResponseBuffer,
class Receiver = receiver_ignore>
net::awaitable<void>
async_reader(
AsyncReadWriteStream& socket,
std::queue<request<Event>>& reqs,
Receiver& recv,
ResponseBuffer& resps)
ResponseBuffer& resps,
Receiver recv = receiver_ignore{})
{
using response_id_type = response_id<Event>;
@@ -284,15 +291,14 @@ async_reader(
// The next two ifs are used to deal with transactions.
if (is_multi || (!trans_empty && !is_exec)) {
// The multi commands always gets a "OK" response and all other
// commands get QUEUED unless the user is e.g. using wrong data types.
auto const* res = cmd == command::multi ? "OK" : "QUEUED";
response_static_string<char, 6> tmp;
response_static_string<6> tmp;
co_await async_read(socket, buffer, tmp, net::use_awaitable);
// Failing to QUEUE a command inside a trasaction is considered an
// application error.
// Failing to QUEUE a command inside a trasaction is
// considered an application error. The multi commands
// always gets a "OK" response and all other commands get
// QUEUED unless the user is e.g. using wrong data types.
auto const* res = cmd == command::multi ? "OK" : "QUEUED";
assert (tmp.result == res);
// Pushes the command in the transction command queue that will be
@@ -344,6 +350,7 @@ async_reader(
buffer,
*tmp,
net::use_awaitable);
recv.receive(id);
if (t != type::push)
@@ -361,30 +368,7 @@ async_reader(
}
}
struct receiver_ignore {
template <class Event>
void receive_transaction(std::queue<response_id<Event>>) { }
template <class Event>
void receive(response_id<Event> const&) { }
};
struct receiver_print {
// The ids in the queue parameter have an unspecified message type.
template <class Event>
void receive_transaction(std::queue<response_id<Event>> ids)
{
while (!std::empty(ids)) {
std::cout << ids.front() << std::endl;
ids.pop();
}
}
template <class Event>
void receive(response_id<Event> const& id)
{ std::cout << id << std::endl; }
};
template <class Event>
template <class Event = event>
std::queue<resp::request<Event>>
make_request_queue()
{

View File

@@ -0,0 +1,94 @@
/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <iostream>
#include "type.hpp"
#include "utils.hpp"
#include "response.hpp"
#include "request.hpp"
namespace aedis { namespace resp {
class receiver_print {
private:
response_buffers& buffer_;
public:
receiver_print(response_buffers& buffer)
: buffer_{buffer}
{}
// The ids in the queue parameter have an unspecified message type.
template <class Event>
void receive_transaction(std::queue<response_id<Event>> ids)
{
while (!std::empty(ids)) {
std::cout << ids.front() << std::endl;
ids.pop();
}
}
template <class Event>
void receive(response_id<Event> const& id)
{
buffer_.tree().clear();
std::cout << id;
switch (id.t) {
case type::push:
buffer_.push().clear();
break;
case type::set:
buffer_.set().clear();
break;
case type::map:
buffer_.map().clear();
break;
case type::attribute:
buffer_.attribute().clear();
break;
case type::array:
buffer_.array().clear();
break;
case type::simple_error:
buffer_.simple_error().clear();
break;
case type::simple_string:
buffer_.simple_string().clear();
break;
case type::number:
break;
case type::double_type:
break;
case type::big_number:
buffer_.big_number().clear();
break;
case type::boolean:
break;
case type::blob_error:
buffer_.blob_error().clear();
break;
case type::blob_string:
buffer_.blob_string().clear();
break;
case type::verbatim_string:
buffer_.verbatim_string().clear();
break;
case type::streamed_string_part:
buffer_.streamed_string_part().clear();
break;
default:{}
}
std::cout << std::endl;
}
};
}
}

View File

@@ -39,11 +39,6 @@ inline
void from_string_view(std::string_view s, std::string& r)
{ r = s; }
/* A base class for flat responses which means response with no
* embedded types in themselves. For exaple, a transaction with an
* lrange in it will produce a response that is an array with an
* array. That is not suitable for this class.
*/
class response_base {
protected:
virtual void on_simple_string_impl(std::string_view s)
@@ -100,7 +95,8 @@ public:
virtual ~response_base() {}
};
struct response_ignore : response_base {
class response_ignore : public response_base {
private:
void on_simple_string_impl(std::string_view s) override {}
void on_simple_error_impl(std::string_view s) override {}
void on_number_impl(std::string_view s) override {}
@@ -119,8 +115,8 @@ struct response_ignore : response_base {
void select_attribute_impl(int n) override {}
};
// This response type is able to deal with recursive redis responses as in a
// transaction for example.
// This response type is able to deal with recursive redis responses
// as in a transaction for example.
class response_tree: public response_base {
public:
struct elem {
@@ -183,9 +179,9 @@ public:
};
template <class T>
class response_number : public response_base {
static_assert(std::is_integral<T>::value);
class response_basic_number : public response_base {
private:
static_assert(std::is_integral<T>::value);
void on_number_impl(std::string_view s) override
{ from_string_view(s, result); }
@@ -193,53 +189,74 @@ public:
T result;
};
using response_number = response_basic_number<long long int>;
template<
class CharT = char,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>>
class response_blob_string : public response_base {
class response_basic_blob_string : public response_base {
private:
void add(std::string_view s)
{ from_string_view(s, result); }
void on_blob_string_impl(std::string_view s) override
{ add(s); }
void on_blob_error_impl(std::string_view s) override
{ add(s); }
void on_simple_string_impl(std::string_view s) override
{ add(s); }
void on_simple_error_impl(std::string_view s) override
{ add(s); }
{ from_string_view(s, result); }
public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_blob_string = response_basic_blob_string<char>;
template<
class CharT = char,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>>
class response_basic_blob_error : public response_base {
private:
void on_blob_error_impl(std::string_view s) override
{ from_string_view(s, result); }
public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_blob_error = response_basic_blob_error<char>;
template<
class CharT = char,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>
>
class response_simple_string : public response_base {
class response_basic_simple_string : public response_base {
private:
void add(std::string_view s)
{ from_string_view(s, result); }
void on_simple_string_impl(std::string_view s) override
{ add(s); }
{ from_string_view(s, result); }
public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_simple_string = response_basic_simple_string<char>;
template<
class CharT = char,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>
>
class response_basic_simple_error : public response_base {
private:
void on_simple_error_impl(std::string_view s) override
{ add(s); }
{ from_string_view(s, result); }
public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_simple_error = response_basic_simple_error<char>;
// Big number use strings at the moment as the underlying storage.
template <
class CharT = char,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>
>
class response_big_number : public response_base {
class response_basic_big_number : public response_base {
private:
void on_big_number_impl(std::string_view s) override
{ from_string_view(s, result); }
@@ -248,13 +265,15 @@ public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_big_number = response_basic_big_number<char>;
// TODO: Use a double instead of string.
template <
class CharT = char,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>
>
class response_double : public response_base {
class response_basic_double : public response_base {
private:
void on_double_impl(std::string_view s) override
{ from_string_view(s, result); }
@@ -263,6 +282,8 @@ public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_double = response_basic_double<char>;
template <
class T,
class Allocator = std::allocator<T>>
@@ -286,7 +307,7 @@ template<
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>
>
class response_verbatim_string : public response_base {
class response_basic_verbatim_string : public response_base {
private:
void on_verbatim_string_impl(std::string_view s) override
{ from_string_view(s, result); }
@@ -294,12 +315,14 @@ public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_verbatim_string = response_basic_verbatim_string<char>;
template<
class CharT = char,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>
>
class response_streamed_string : public response_base {
class response_basic_streamed_string : public response_base {
private:
void on_streamed_string_part_impl(std::string_view s) override
{ result += s; }
@@ -307,6 +330,8 @@ public:
std::basic_string<CharT, Traits, Allocator> result;
};
using response_streamed_string = response_basic_streamed_string<char>;
template <
class Key,
class Compare = std::less<Key>,
@@ -379,6 +404,7 @@ private:
result.emplace_back(std::move(r));
}
// TODO: Call vector reserver.
void on_simple_string_impl(std::string_view s) override { add(s); }
void on_number_impl(std::string_view s) override { add(s); }
void on_double_impl(std::string_view s) override { add(s); }
@@ -405,16 +431,16 @@ using response_flat_set = response_array<T, Allocator>;
template <class T, std::size_t N>
class response_static_array : public response_base {
private:
int i = 0;
int i_ = 0;
void on_blob_string_impl(std::string_view s) override
{ from_string_view(s, result[i++]); }
{ from_string_view(s, result[i_++]); }
void select_array_impl(int n) override { }
public:
std::array<T, N> result;
};
template <class T, std::size_t N>
template <std::size_t N>
class response_static_string : public response_base {
private:
void add(std::string_view s)
@@ -434,10 +460,10 @@ template <
>
class response_static_flat_map : public response_base {
private:
int i = 0;
int i_ = 0;
void add(std::string_view s = {})
{ from_string_view(s, result.at(i++)); }
{ from_string_view(s, result.at(i_++)); }
void on_blob_string_impl(std::string_view s) override
{ add(s); }
void on_number_impl(std::string_view s) override
@@ -474,18 +500,21 @@ private:
response_array<std::string> set_;
response_array<std::string> map_;
response_array<std::string> attribute_;
response_simple_string<char> simple_string_;
response_simple_string<char> simple_error_;
response_number<long long int> number_;
response_double<char> double_;
response_big_number<char> big_number_;
response_blob_string<char> blob_string_;
response_blob_string<char> blob_error_;
response_verbatim_string<char> verbatim_string_;
response_streamed_string<char> streamed_string_part_;
response_simple_string simple_string_;
response_simple_error simple_error_;
response_number number_;
response_double double_;
response_bool boolean_;
response_big_number big_number_;
response_blob_string blob_string_;
response_blob_error blob_error_;
response_verbatim_string verbatim_string_;
response_streamed_string streamed_string_part_;
response_ignore ignore_;
public:
auto& tree() {return tree_.result;};
auto& array() {return array_.result;};
auto const& array() const noexcept {return array_.result;};
@@ -510,6 +539,9 @@ public:
auto& number() {return number_.result;};
auto const& number() const noexcept {return number_.result;};
auto& boolean() {return boolean_.result;};
auto const& boolean() const noexcept {return boolean_.result;};
auto& double_type() {return double_.result;};
auto const& double_type() const noexcept {return double_.result;};
@@ -528,10 +560,6 @@ public:
auto& streamed_string_part() {return streamed_string_part_.result;};
auto const& streamed_string_part() const noexcept {return streamed_string_part_.result;};
// TODO: The types bellow are still missing.
//null
//boolean
// When the id is from a transaction the type of the message is not
// specified.
template <class Event>
@@ -540,7 +568,7 @@ public:
if (id.cmd == command::exec)
return &tree_;
switch (id.type) {
switch (id.t) {
case type::push: return &push_;
case type::set: return &set_;
case type::map: return &map_;
@@ -551,6 +579,7 @@ public:
case type::number: return &number_;
case type::double_type: return &double_;
case type::big_number: return &big_number_;
case type::boolean: return &boolean_;
case type::blob_error: return &blob_error_;
case type::blob_string: return &blob_string_;
case type::verbatim_string: return &verbatim_string_;
@@ -570,7 +599,7 @@ operator<<(std::ostream& os, aedis::resp::response_id<Event> const& id)
os
<< std::left << std::setw(15) << aedis::resp::to_string(id.cmd)
<< std::left << std::setw(20) << aedis::resp::to_string(id.t)
<< std::left << std::setw(20) << id.event
<< std::left << std::setw(20) << (int)id.event
;
return os;
}

View File

@@ -64,9 +64,9 @@ net::awaitable<void> test_list()
}
{ // rpush
resp::response_number<int> res;
resp::response_number res;
co_await resp::async_read(socket, buffer, res);
check_equal(res.result, 6, "rpush");
check_equal(res.result, (long long int)6, "rpush");
}
{ // lrange
@@ -229,25 +229,25 @@ net::awaitable<void> number()
{ // int
std::string cmd {":-3\r\n"};
test_tcp_socket ts {cmd};
resp::response_number<int> res;
resp::response_number res;
co_await resp::async_read(ts, buffer, res);
check_equal(res.result, -3, "number (int)");
check_equal(res.result, (long long int)-3, "number (int)");
}
{ // unsigned
std::string cmd {":3\r\n"};
test_tcp_socket ts {cmd};
resp::response_number<int> res;
resp::response_number res;
co_await resp::async_read(ts, buffer, res);
check_equal(res.result, 3, "number (unsigned)");
check_equal(res.result, (long long int)3, "number (unsigned)");
}
{ // std::size_t
std::string cmd {":1111111\r\n"};
test_tcp_socket ts {cmd};
resp::response_number<int> res;
resp::response_number res;
co_await resp::async_read(ts, buffer, res);
check_equal(res.result, 1111111, "number (std::size_t)");
check_equal(res.result, (long long int)1111111, "number (std::size_t)");
}
}
@@ -321,7 +321,7 @@ net::awaitable<void> simple_error()
{
std::string cmd {"-Error\r\n"};
test_tcp_socket ts {cmd};
resp::response_simple_string res;
resp::response_simple_error res;
co_await resp::async_read(ts, buffer, res);
check_equal(res.result, {"Error"}, "simple_error (message)");
}
@@ -382,7 +382,7 @@ net::awaitable<void> blob_error()
{
std::string cmd {"!21\r\nSYNTAX invalid syntax\r\n"};
test_tcp_socket ts {cmd};
resp::response_blob_string res;
resp::response_blob_error res;
co_await resp::async_read(ts, buffer, res);
check_equal(res.result, {"SYNTAX invalid syntax"}, "blob_error (message)");
}
@@ -390,7 +390,7 @@ net::awaitable<void> blob_error()
{
std::string cmd {"!0\r\n\r\n"};
test_tcp_socket ts {cmd};
resp::response_blob_string res;
resp::response_blob_error res;
co_await resp::async_read(ts, buffer, res);
check_equal(res.result, {}, "blob_error (empty message)");
}