2
0
mirror of https://github.com/boostorg/redis.git synced 2026-02-09 11:22:22 +00:00

Make progresses responses.

This commit is contained in:
Marcelo Zimbres
2020-12-13 09:52:35 +01:00
parent 574a9864ae
commit 7fd39496dc
2 changed files with 123 additions and 17 deletions

View File

@@ -71,11 +71,50 @@ net::awaitable<void> example2()
}
}
net::awaitable<void> example3()
{
auto ex = co_await this_coro::executor;
tcp::resolver resv(ex);
auto const r = resv.resolve("127.0.0.1", "6379");
tcp_socket socket {ex};
co_await async_connect(socket, r);
resp::pipeline p;
p.flushall();
p.rpush("key", {1, 2, 3});
p.lrange("key");
p.lrange("key");
p.lrange("key");
co_await async_write(socket, buffer(p.payload));
resp::buffer buffer;
resp::response res1;
co_await resp::async_read(socket, buffer, res1);
co_await resp::async_read(socket, buffer, res1);
resp::response_list<int> res2;
co_await resp::async_read(socket, buffer, res2);
resp::print(res2.result);
resp::response_list<long long> res3;
co_await resp::async_read(socket, buffer, res3);
resp::print(res3.result);
resp::response_list<std::string> res4;
co_await resp::async_read(socket, buffer, res4);
resp::print(res4.result);
}
int main()
{
io_context ioc {1};
co_spawn(ioc, example1(), detached);
co_spawn(ioc, example2(), detached);
//co_spawn(ioc, example1(), detached);
//co_spawn(ioc, example2(), detached);
co_spawn(ioc, example3(), detached);
ioc.run();
}

View File

@@ -26,6 +26,7 @@
#include <string_view>
#include <forward_list>
#include <unordered_set>
#include <charconv>
#include <boost/asio.hpp>
@@ -41,6 +42,61 @@ namespace resp
using buffer = std::string;
struct response_throw {
virtual void select_array(int n) { throw std::runtime_error("select_array: Has not been overridden."); }
virtual void select_push(int n) { throw std::runtime_error("select_push: Has not been overridden."); }
virtual void select_set(int n) { throw std::runtime_error("select_set: Has not been overridden."); }
virtual void select_map(int n) { throw std::runtime_error("select_map: Has not been overridden."); }
virtual void select_attribute(int n) { throw std::runtime_error("select_attribute: Has not been overridden."); }
virtual void on_simple_string(std::string_view s) { throw std::runtime_error("on_simple_string: Has not been overridden."); }
virtual void on_simple_error(std::string_view s) { throw std::runtime_error("on_simple_error: Has not been overridden."); }
virtual void on_number(std::string_view s) { throw std::runtime_error("on_number: Has not been overridden."); }
virtual void on_double(std::string_view s) { throw std::runtime_error("on_double: Has not been overridden."); }
virtual void on_bool(std::string_view s) { throw std::runtime_error("on_bool: Has not been overridden."); }
virtual void on_big_number(std::string_view s) { throw std::runtime_error("on_big_number: Has not been overridden."); }
virtual void on_null() { throw std::runtime_error("on_null: Has not been overridden."); }
virtual void on_blob_error(std::string_view s = {}) { throw std::runtime_error("on_blob_error: Has not been overridden."); }
virtual void on_verbatim_string(std::string_view s = {}) { throw std::runtime_error("on_verbatim_string: Has not been overridden."); }
virtual void on_blob_string(std::string_view s = {}) { throw std::runtime_error("on_blob_string: Has not been overridden."); }
virtual void on_streamed_string_part(std::string_view s = {}) { throw std::runtime_error("on_streamed_string_part: Has not been overridden."); }
};
template <class T>
T to_number(std::string_view s)
{
T n;
auto r = std::from_chars(s.data(),
s.data() + s.size(),
n);
if (r.ec == std::errc::invalid_argument)
throw std::runtime_error("Unable to convert");
return n;
}
template <class T>
struct converter {
template <class U = std::enable_if<std::is_integral<T>::value, T>::type>
static T apply(std::string_view s, U = {})
{ return to_number<T>(s); }
};
template <>
struct converter<std::string> {
static std::string apply(std::string_view s)
{ return std::string {s}; }
};
template <class T, class Allocator = std::allocator<T>>
struct response_list : response_throw {
std::list<T, Allocator> result;
void select_array(int n) override { }
void on_blob_string(std::string_view s) override
{ result.push_back(converter<T>::apply(s)); }
};
// General purpose response. Copies the string reponses in the result
// vector.
struct response_vector {
@@ -87,7 +143,16 @@ std::size_t length(char const* p)
return len;
}
void print(std::vector<std::string> const& v)
template <class T>
void print(std::vector<T> const& v)
{
for (auto const& o : v)
std::cout << o << " ";
std::cout << std::endl;
}
template <class T>
void print(std::list<T> const& v)
{
for (auto const& o : v)
std::cout << o << " ";
@@ -109,6 +174,7 @@ void print_command_raw(std::string const& data, int n)
}
}
template <class Response>
class parser {
public:
enum class bulk
@@ -120,7 +186,7 @@ public:
};
private:
resp::response* res_ = nullptr;
Response* res_ = nullptr;
int depth_ = 0;
int sizes_[6] = {2, 1, 1, 1, 1, 1}; // Streaming will require a bigger integer.
bulk bulk_ = bulk::none;
@@ -233,7 +299,7 @@ private:
}
public:
parser(resp::response* res)
parser(Response* res)
: res_ {res}
{}
@@ -287,16 +353,16 @@ public:
// The parser supports up to 5 levels of nested structures. The first
// element in the sizes stack is a sentinel and must be different from
// 1.
template <class AsyncReadStream>
template <class AsyncReadStream, class Response>
class parse_op {
private:
AsyncReadStream& stream_;
resp::buffer* buf_ = nullptr;
parser parser_;
parser<Response> parser_;
int start_ = 1;
public:
parse_op(AsyncReadStream& stream, resp::buffer* buf, resp::response* res)
parse_op(AsyncReadStream& stream, resp::buffer* buf, Response* res)
: stream_ {stream}
, buf_ {buf}
, parser_ {res}
@@ -309,7 +375,7 @@ public:
{
switch (start_) {
for (;;) {
if (parser_.bulk() == parser::bulk::none) {
if (parser_.bulk() == parser<Response>::bulk::none) {
case 1:
start_ = 0;
net::async_read_until(
@@ -359,18 +425,18 @@ public:
}
};
template <class SyncReadStream>
template <class SyncReadStream, class Response>
auto read(
SyncReadStream& stream,
resp::buffer& buf,
resp::response& res,
Response& res,
boost::system::error_code& ec)
{
parser p {&res};
parser<Response> p {&res};
std::size_t n = 0;
goto start;
do {
if (p.bulk() == parser::bulk::none) {
if (p.bulk() == parser<Response>::bulk::none) {
start:
n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec);
if (ec || n < 3)
@@ -394,12 +460,12 @@ start:
return n;
}
template<class SyncReadStream>
template<class SyncReadStream, class Response>
std::size_t
read(
SyncReadStream& stream,
resp::buffer& buf,
resp::response& res)
Response& res)
{
boost::system::error_code ec;
auto const n = read(stream, buf, res, ec);
@@ -412,20 +478,21 @@ read(
template <
class AsyncReadStream,
class Response,
class CompletionToken =
net::default_completion_token_t<typename AsyncReadStream::executor_type>
>
auto async_read(
AsyncReadStream& stream,
resp::buffer& buffer,
resp::response& res,
Response& res,
CompletionToken&& token =
net::default_completion_token_t<typename AsyncReadStream::executor_type>{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(parse_op<AsyncReadStream> {stream, &buffer, &res},
>(parse_op<AsyncReadStream, Response> {stream, &buffer, &res},
token,
stream);
}