2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00
- Moves response classes to their own header.
- aedis.hpp used only to include other headers like in boost.
- Removes buffer typedef.
- Events example.
- Async example.
- Improvements in the makefile.
- Adds reconnect example.
This commit is contained in:
Marcelo Zimbres
2020-12-21 10:12:57 +01:00
parent be4f3bd18b
commit c5ad08ecb4
15 changed files with 1015 additions and 687 deletions

View File

@@ -18,21 +18,36 @@ CPPFLAGS += -D BOOST_ASIO_NO_TS_EXECUTORS
LDFLAGS += -pthread
all: sync async general
examples =
examples += sync_basic
examples += sync_responses
examples += sync_events
examples += async_basic
examples += async_reconnect
examples += async
tests =
tests += general
remove =
remove += $(examples)
remove += $(tests)
remove += $(addsuffix .o, $(examples))
remove += $(addsuffix .o, $(tests))
remove += Makefile.dep
all: general $(examples)
Makefile.dep:
-$(CXX) -MM -I./include ./examples/*.cpp ./tests/*.cpp > $@
-include Makefile.dep
async: examples/async.cpp
$(CXX) -o $@ $^ $(CPPFLAGS) $(LDFLAGS)
$(examples): % : examples/%.cpp
$(CXX) -o $@ $< $(CPPFLAGS) $(LDFLAGS)
sync: examples/sync.cpp
$(CXX) -o $@ $^ $(CPPFLAGS) $(LDFLAGS)
general: % : tests/general.cpp
$(CXX) -o $@ $^ $(CPPFLAGS) $(LDFLAGS)
$(tests): % : tests/%.cpp
$(CXX) -o $@ $< $(CPPFLAGS) $(LDFLAGS)
.PHONY: check
check: general
@@ -40,5 +55,5 @@ check: general
.PHONY: clean
clean:
rm -f sync sync.o async async.o general general.o Makefile.dep
rm -f $(remove)

219
README.md
View File

@@ -1,89 +1,161 @@
# Aedis
Aedis is a redis client designed for seamless integration with async code while
providing a easy and intuitive interface. To use this library include
`aedis.hpp` in your project.
Aedis is a low level redis client designed for scalability while
providing and to provide an easy and intuitive interface.
## Tutoria and examples
## Tutorial
Below we show how to use the library focused in sync and async code.
We begin with sync events below and jump to async code thereafter.
### Sync
```cpp
void sync_example1()
{
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
resp::pipeline p;
p.ping();
net::write(socket, buffer(p.payload));
resp::buffer buffer;
resp::response res;
resp::read(socket, buffer, res);
// res.result contains the response as std::vector<std::string>.
}
```
The example above is overly simple. In real world cases it is
necessary, for many reasons to keep reading from the socket, for
example to detect the connection has been lost or to be able to deal
with redis unsolicited events. A more realistic example therefore is
Synchronous code are clear by themselves
```cpp
void sync_example2()
int main()
{
io_context ioc {1};
try {
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
resp::pipeline p;
p.multi();
p.ping();
p.set("Name", {"Marcelo"});
p.incr("Age");
p.exec();
p.quit();
resp::pipeline p;
p.set("Password", {"12345"});
p.quit();
net::write(socket, buffer(p.payload));
net::write(socket, buffer(p.payload));
resp::buffer buffer;
for (;;) {
boost::system::error_code ec;
resp::response res;
resp::read(socket, buffer, res, ec);
if (ec) {
std::cerr << ec.message() << std::endl;
break;
std::string buffer;
for (;;) {
resp::response_string res;
resp::read(socket, buffer, res);
std::cout << res.result << std::endl;
}
resp::print(res.result);
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}
```
In this example we add more commands to the pipeline, they will be all
sent together to redis improving performance. Second we keep reading
until the socket is closed by redis after it receives the quit
command.
We keep reading from the socket until it is closed as a result of the
quit command. The commands are sent in a pipeline to redis, which
greatly improves performance. Notice also we parse the result in the
buffer resp::reponse_string. This is overly simplistic for most apps.
Typically we will want to parse each response in an appropriate
data structure.
#### Response buffer
Aedis comes with general purpose response that are suitable to parse
directly in C++ built-in data types and containers or on your own. For
example
```cpp
int main()
{
try {
... // Like before
std::string buffer;
resp::response_int<int> list_size; // Parses into an int
resp::read(socket, buffer, list_size);
std::cout << list_size.result << std::endl;
resp::response_list<int> list; // Parses into a std::list<int>
resp::read(socket, buffer, list);
print(list.result);
resp::response_string ok; // Parses into a std::string
resp::read(socket, buffer, ok);
std::cout << ok.result << std::endl;
resp::response noop;
resp::read(socket, buffer, noop);
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}
```
Other response types are available also. Structure the code like in
the example above is usually not feasible as the commands send are
determined dynamically, for that case we have events.
#### Events
To use events, first defined the an enum, for example
```cpp
enum class myevents
{ ignore
, list
, set
};
```
With that you can switch on the appropriate response type easily
```cpp
int main()
{
try {
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
resp::pipeline<myevents> p;
p.rpush("list", {1, 2, 3});
p.lrange("list", 0, -1, myevents::list);
p.sadd("set", std::set<int>{3, 4, 5});
p.smembers("set", myevents::set);
p.quit();
net::write(socket, buffer(p.payload));
std::string buffer;
for (;;) {
switch (p.events.front()) {
case myevents::list:
{
resp::response_list<int> res;
resp::read(socket, buffer, res);
print(res.result);
} break;
case myevents::set:
{
resp::response_set<int> res;
resp::read(socket, buffer, res);
print(res.result);
} break;
default:
{
resp::response res;
resp::read(socket, buffer, res);
}
}
p.events.pop();
}
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}
```
### Async
The sync examples above are good as introduction and can be also used
in production. However to don't scale, this is specially problematic
on backends. Fourtunately in C++20 it became trivial to convert the
sync into asyn code. The example below shows an example.
The sync examples above are good as introduction and can be also
useful in production. However, sync code doesn't scale well, this is
specially problematic on backends. Fortunately in C++20 it became
trivial to convert sync into asyn code. The example below shows
the async version of our first sync example.
```cpp
net::awaitable<void> async_example1()
net::awaitable<void> example1()
{
auto ex = co_await this_coro::executor;
@@ -93,31 +165,22 @@ net::awaitable<void> async_example1()
tcp_socket socket {ex};
co_await async_connect(socket, r);
std::map<std::string, std::string> map
{ {{"Name"}, {"Marcelo"}}
, {{"Education"}, {"Physics"}}
, {{"Job"}, {"Programmer"}}
};
resp::pipeline p;
p.hset("map", map);
p.hincrby("map", "Age", 40);
p.hmget("map", {"Name", "Education", "Job"});
p.set("Password", {"12345"});
p.quit();
co_await async_write(socket, buffer(p.payload));
resp::buffer buffer;
std::string buffer;
for (;;) {
resp::response res;
resp::response_string res;
co_await resp::async_read(socket, buffer, res);
resp::print(res.res);
std::cout << res.result << std::endl;
}
}
```
Though short the example above ilustrates many important points
* STL containers are suported when appropriate.
* Commands are sent to redis in pipeline to improve performance.
Basically, we only have to replace read with async_read and use the
co_await keyword.
## Sentinel support

11
doc/asio_and_threads.md Normal file
View File

@@ -0,0 +1,11 @@
A question the arises frequently is how to do multi-threading properly
with ASIO. They can all be classified according to whether
io_context::run() is called from multiple or a single thread.
## Calling io_context::run() from one thread.
This case has a special case a single threaded app.
## Calling io_context::run() from multiple threads.

View File

@@ -41,7 +41,7 @@ net::awaitable<void> example1()
co_await async_write(socket, buffer(p.payload));
resp::buffer buffer;
std::string buffer;
for (;;) {
resp::response_vector<std::string> res;
co_await resp::async_read(socket, buffer, res);
@@ -63,7 +63,7 @@ net::awaitable<void> example2()
p.subscribe("channel");
co_await async_write(socket, buffer(p.payload));
resp::buffer buffer;
std::string buffer;
for (;;) {
resp::response_vector<std::string> res;
co_await resp::async_read(socket, buffer, res);
@@ -94,7 +94,7 @@ net::awaitable<void> example3()
co_await async_write(socket, buffer(p.payload));
resp::buffer buffer;
std::string buffer;
{ // flushall
resp::response_string res;

47
examples/async_basic.cpp Normal file
View File

@@ -0,0 +1,47 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
namespace net = aedis::net;
using tcp = net::ip::tcp;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<tcp::socket>;
namespace this_coro = net::this_coro;
using namespace net;
using namespace aedis;
net::awaitable<void> example1()
{
resp::pipeline p;
p.set("Password", {"12345"});
p.quit();
auto ex = co_await this_coro::executor;
tcp::resolver resv(ex);
auto const r = resv.resolve("127.0.0.1", "6379");
tcp_socket socket {ex};
co_await async_connect(socket, r);
co_await async_write(socket, buffer(p.payload));
std::string buffer;
for (;;) {
resp::response_string res;
co_await resp::async_read(socket, buffer, res);
std::cout << res.result << std::endl;
}
}
int main()
{
io_context ioc {1};
co_spawn(ioc, example1(), detached);
ioc.run();
}

View File

@@ -0,0 +1,56 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <boost/asio.hpp>
#include <aedis/aedis.hpp>
namespace net = aedis::net;
using tcp = net::ip::tcp;
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<tcp::socket>;
using stimer = net::use_awaitable_t<>::as_default_on_t<net::steady_timer>;
namespace this_coro = net::this_coro;
using namespace net;
using namespace aedis;
net::awaitable<void> example1()
{
auto ex = co_await this_coro::executor;
for (;;) {
try {
resp::pipeline p;
p.set("Password", {"12345"});
p.quit();
tcp::resolver resv(ex);
auto const r = resv.resolve("127.0.0.1", "6379");
tcp_socket socket {ex};
co_await async_connect(socket, r);
co_await async_write(socket, buffer(p.payload));
std::string buffer;
for (;;) {
resp::response_string res;
co_await resp::async_read(socket, buffer, res);
std::cout << res.result << std::endl;
}
} catch (std::exception const& e) {
stimer timer(ex);
timer.expires_after(std::chrono::seconds{2});
co_await timer.async_wait();
}
}
}
int main()
{
io_context ioc {1};
co_spawn(ioc, example1(), detached);
ioc.run();
}

View File

@@ -1,71 +0,0 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <aedis/aedis.hpp>
namespace net = aedis::net;
using namespace net;
using namespace aedis;
void sync_example1()
{
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
resp::pipeline p;
p.ping();
p.quit();
net::write(socket, buffer(p.payload));
resp::buffer buffer;
resp::response_vector<std::string> res;
resp::read(socket, buffer, res);
print(res.result);
}
void sync_example2()
{
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
resp::pipeline p;
p.multi();
p.ping();
p.set("Name", {"Marcelo"});
p.incr("Age");
p.exec();
p.quit();
net::write(socket, buffer(p.payload));
resp::buffer buffer;
for (;;) {
boost::system::error_code ec;
resp::response_vector<std::string> res;
resp::read(socket, buffer, res, ec);
if (ec) {
std::cerr << ec.message() << std::endl;
break;
}
print(res.result);
}
}
int main()
{
sync_example1();
sync_example2();
}

38
examples/sync_basic.cpp Normal file
View File

@@ -0,0 +1,38 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <aedis/aedis.hpp>
namespace net = aedis::net;
using namespace net;
using namespace aedis;
int main()
{
try {
resp::pipeline p;
p.set("Password", {"12345"});
p.quit();
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
net::write(socket, buffer(p.payload));
std::string buffer;
for (;;) {
resp::response_string res;
resp::read(socket, buffer, res);
std::cout << res.result << std::endl;
}
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}

64
examples/sync_events.cpp Normal file
View File

@@ -0,0 +1,64 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <aedis/aedis.hpp>
namespace net = aedis::net;
using namespace net;
using namespace aedis;
enum class myevents
{ ignore
, list
, set
};
int main()
{
try {
resp::pipeline<myevents> p;
p.rpush("list", {1, 2, 3});
p.lrange("list", 0, -1, myevents::list);
p.sadd("set", std::set<int>{3, 4, 5});
p.smembers("set", myevents::set);
p.quit();
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
net::write(socket, buffer(p.payload));
std::string buffer;
for (;;) {
switch (p.events.front()) {
case myevents::list:
{
resp::response_list<int> res;
resp::read(socket, buffer, res);
print(res.result);
} break;
case myevents::set:
{
resp::response_set<int> res;
resp::read(socket, buffer, res);
print(res.result);
} break;
default:
{
resp::response res;
resp::read(socket, buffer, res);
}
}
p.events.pop();
}
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}

View File

@@ -0,0 +1,49 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <aedis/aedis.hpp>
namespace net = aedis::net;
using namespace net;
using namespace aedis;
int main()
{
try {
resp::pipeline p;
p.rpush("list", {1, 2, 3});
p.lrange("list");
p.quit();
io_context ioc {1};
tcp::resolver resv(ioc);
tcp::socket socket {ioc};
net::connect(socket, resv.resolve("127.0.0.1", "6379"));
net::write(socket, buffer(p.payload));
std::string buffer;
resp::response_int<int> list_size;
resp::read(socket, buffer, list_size);
std::cout << list_size.result << std::endl;
resp::response_list<int> list;
resp::read(socket, buffer, list);
print(list.result);
resp::response_string ok;
resp::read(socket, buffer, ok);
std::cout << ok.result << std::endl;
resp::response noop;
resp::read(socket, buffer, noop);
} catch (std::exception const& e) {
std::cerr << e.what() << std::endl;
}
}

View File

@@ -7,516 +7,6 @@
#pragma once
#include <set>
#include <list>
#include <array>
#include <vector>
#include <string>
#include <cstdio>
#include <utility>
#include <cstdlib>
#include <cstring>
#include <numeric>
#include <iostream>
#include <algorithm>
#include <functional>
#include <type_traits>
#include <string_view>
#include <charconv>
#include <boost/asio.hpp>
#include "pipeline.hpp"
template <class Iter>
void print(Iter begin, Iter end)
{
for (; begin != end; ++begin)
std::cout << *begin << " ";
std::cout << std::endl;
}
template <class Range>
void print(Range const& v)
{
using std::cbegin;
using std::cend;
print(cbegin(v), cend(v));
}
namespace aedis
{
namespace net = boost::asio;
namespace ip = net::ip;
using tcp = ip::tcp;
namespace resp
{
using buffer = std::string;
struct response_throw {
virtual void select_array(int n) { throw std::runtime_error("select_array: Has not been overridden."); }
virtual void select_push(int n) { throw std::runtime_error("select_push: Has not been overridden."); }
virtual void select_set(int n) { throw std::runtime_error("select_set: Has not been overridden."); }
virtual void select_map(int n) { throw std::runtime_error("select_map: Has not been overridden."); }
virtual void select_attribute(int n) { throw std::runtime_error("select_attribute: Has not been overridden."); }
virtual void on_simple_string(std::string_view s) { throw std::runtime_error("on_simple_string: Has not been overridden."); }
virtual void on_simple_error(std::string_view s) { throw std::runtime_error("on_simple_error: Has not been overridden."); }
virtual void on_number(std::string_view s) { throw std::runtime_error("on_number: Has not been overridden."); }
virtual void on_double(std::string_view s) { throw std::runtime_error("on_double: Has not been overridden."); }
virtual void on_bool(std::string_view s) { throw std::runtime_error("on_bool: Has not been overridden."); }
virtual void on_big_number(std::string_view s) { throw std::runtime_error("on_big_number: Has not been overridden."); }
virtual void on_null() { throw std::runtime_error("on_null: Has not been overridden."); }
virtual void on_blob_error(std::string_view s = {}) { throw std::runtime_error("on_blob_error: Has not been overridden."); }
virtual void on_verbatim_string(std::string_view s = {}) { throw std::runtime_error("on_verbatim_string: Has not been overridden."); }
virtual void on_blob_string(std::string_view s = {}) { throw std::runtime_error("on_blob_string: Has not been overridden."); }
virtual void on_streamed_string_part(std::string_view s = {}) { throw std::runtime_error("on_streamed_string_part: Has not been overridden."); }
};
template <class T>
std::enable_if<std::is_integral<T>::value, void>::type
from_string_view(std::string_view s, T& n)
{
auto r = std::from_chars(s.data(), s.data() + s.size(), n);
if (r.ec == std::errc::invalid_argument)
throw std::runtime_error("from_chars: Unable to convert");
}
void from_string_view(std::string_view s, std::string& r)
{ r = s; }
template <class T, class Allocator = std::allocator<T>>
struct response_list : response_throw {
std::list<T, Allocator> result;
void select_array(int n) override { }
void on_blob_string(std::string_view s) override
{
T r;
from_string_view(s, r);
result.push_back(std::move(r));
}
};
template <class T>
struct response_int : response_throw {
T result;
void on_number(std::string_view s) override
{ from_string_view(s, result); }
};
template<
class CharT,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>>
struct response_basic_string : response_throw {
std::basic_string<CharT, Traits, Allocator> result;
void on_simple_string(std::string_view s)
{ from_string_view(s, result); }
};
using response_string = response_basic_string<char>;
template<
class Key,
class Compare = std::less<Key>,
class Allocator = std::allocator<Key>>
struct response_set : response_throw {
std::set<Key, Compare, Allocator> result;
void select_array(int n) override { }
void select_set(int n) override { }
void on_blob_string(std::string_view s) override
{
Key r;
from_string_view(s, r);
result.insert(std::end(result), std::move(r));
}
};
template <class T>
struct response_vector : response_throw {
private:
void add(std::string_view s = {})
{
T r;
from_string_view(s, r);
result.emplace_back(std::move(r));
}
public:
std::vector<T> result;
void clear() { result.clear(); }
auto size() const noexcept { return std::size(result); }
void select_array(int n) override { }
void select_push(int n) override { }
void select_set(int n) override { }
void select_map(int n) override { }
void select_attribute(int n) override { }
void on_simple_string(std::string_view s) override { add(s); }
void on_simple_error(std::string_view s) override { add(s); }
void on_number(std::string_view s) override { add(s); }
void on_double(std::string_view s) override { add(s); }
void on_bool(std::string_view s) override { add(s); }
void on_big_number(std::string_view s) override { add(s); }
void on_null() override { add(); }
void on_blob_error(std::string_view s = {}) override { add(s); }
void on_verbatim_string(std::string_view s = {}) override { add(s); }
void on_blob_string(std::string_view s = {}) override { add(s); }
void on_streamed_string_part(std::string_view s = {}) override { add(s); }
};
// Converts a decimal number in ascii format to an integer.
inline
std::size_t length(char const* p)
{
std::size_t len = 0;
while (*p != '\r') {
len = (10 * len) + (*p - '0');
p++;
}
return len;
}
void print_command_raw(std::string const& data, int n)
{
for (int i = 0; i < n; ++i) {
if (data[i] == '\n') {
std::cout << "\\n";
continue;
}
if (data[i] == '\r') {
std::cout << "\\r";
continue;
}
std::cout << data[i];
}
}
template <class Response>
class parser {
public:
enum class bulk
{ blob_error
, verbatim_string
, blob_string
, streamed_string_part
, none
};
private:
Response* res_ = nullptr;
int depth_ = 0;
int sizes_[6] = {2, 1, 1, 1, 1, 1}; // Streaming will require a bigger integer.
bulk bulk_ = bulk::none;
int bulk_length_ = std::numeric_limits<int>::max();
auto on_array_impl(char const* data, int m = 1)
{
auto const l = length(data + 1);
if (l == 0) {
--sizes_[depth_];
return l;
}
auto const size = m * l;
sizes_[++depth_] = size;
return size;
}
void on_array(char const* data)
{ res_->select_array(on_array_impl(data, 1)); }
void on_push(char const* data)
{ res_->select_push(on_array_impl(data, 1)); }
void on_set(char const* data)
{ res_->select_set(on_array_impl(data, 1)); }
void on_map(char const* data)
{ res_->select_map(on_array_impl(data, 2)); }
void on_attribute(char const* data)
{ res_->select_attribute(on_array_impl(data, 2)); }
void on_null()
{ res_->on_null(); --sizes_[depth_]; }
auto handle_simple_string(char const* data, std::size_t n)
{
--sizes_[depth_];
return std::string_view {data + 1, n - 3};
}
void on_simple_string(char const* data, std::size_t n)
{ res_->on_simple_string(handle_simple_string(data, n)); }
void on_simple_error(char const* data, std::size_t n)
{ res_->on_simple_error(handle_simple_string(data, n)); }
void on_number(char const* data, std::size_t n)
{ res_->on_number(handle_simple_string(data, n)); }
void on_double(char const* data, std::size_t n)
{ res_->on_double(handle_simple_string(data, n)); }
void on_boolean(char const* data, std::size_t n)
{ res_->on_bool(handle_simple_string(data, n)); }
void on_big_number(char const* data, std::size_t n)
{ res_->on_big_number(handle_simple_string(data, n)); }
void on_bulk(bulk b, std::string_view s = {})
{
switch (b) {
case bulk::blob_error: res_->on_blob_error(s); break;
case bulk::verbatim_string: res_->on_verbatim_string(s); break;
case bulk::blob_string: res_->on_blob_string(s); break;
case bulk::streamed_string_part:
{
if (std::empty(s)) {
sizes_[depth_] = 1;
} else {
res_->on_streamed_string_part(s);
}
} break;
default: assert(false);
}
--sizes_[depth_];
}
auto on_blob_error_impl(char const* data, bulk b)
{
auto const l = length(data + 1);
if (l == -1 || l == 0) {
on_bulk(b);
return bulk::none;
}
bulk_length_ = l;
return b;
}
auto on_streamed_string_size(char const* data)
{ return on_blob_error_impl(data, bulk::streamed_string_part); }
auto on_blob_error(char const* data)
{ return on_blob_error_impl(data, bulk::blob_error); }
auto on_verbatim_string(char const* data)
{ return on_blob_error_impl(data, bulk::verbatim_string); }
auto on_blob_string(char const* data)
{
if (*(data + 1) == '?') {
sizes_[++depth_] = std::numeric_limits<int>::max();
return bulk::none;
}
return on_blob_error_impl(data, bulk::blob_string);
}
public:
parser(Response* res)
: res_ {res}
{}
std::size_t advance(char const* data, std::size_t n)
{
auto next = bulk::none;
if (bulk_ != bulk::none) {
n = bulk_length_ + 2;
on_bulk(bulk_, {data, (std::size_t)bulk_length_});
} else {
if (sizes_[depth_] != 0) {
switch (*data) {
case '!': next = on_blob_error(data); break;
case '=': next = on_verbatim_string(data); break;
case '$': next = on_blob_string(data); break;
case ';': next = on_streamed_string_size(data); break;
case '-': on_simple_error(data, n); break;
case ':': on_number(data, n); break;
case ',': on_double(data, n); break;
case '#': on_boolean(data, n); break;
case '(': on_big_number(data, n); break;
case '+': on_simple_string(data, n); break;
case '_': on_null(); break;
case '>': on_push(data); break;
case '~': on_set(data); break;
case '*': on_array(data); break;
case '|': on_attribute(data); break;
case '%': on_map(data); break;
default: assert(false);
}
}
}
while (sizes_[depth_] == 0)
--sizes_[--depth_];
bulk_ = next;
return n;
}
auto done() const noexcept
{ return depth_ == 0 && bulk_ == bulk::none; }
auto bulk() const noexcept
{ return bulk_; }
auto bulk_length() const noexcept
{ return bulk_length_; }
};
// The parser supports up to 5 levels of nested structures. The first
// element in the sizes stack is a sentinel and must be different from
// 1.
template <class AsyncReadStream, class Response>
class parse_op {
private:
AsyncReadStream& stream_;
resp::buffer* buf_ = nullptr;
parser<Response> parser_;
int start_ = 1;
public:
parse_op(AsyncReadStream& stream, resp::buffer* buf, Response* res)
: stream_ {stream}
, buf_ {buf}
, parser_ {res}
{ }
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
switch (start_) {
for (;;) {
if (parser_.bulk() == parser<Response>::bulk::none) {
case 1:
start_ = 0;
net::async_read_until(
stream_,
net::dynamic_buffer(*buf_),
"\r\n",
std::move(self));
return;
}
// On a bulk read we can't read until delimiter since the
// payload may contain the delimiter itself so we have to
// read the whole chunk. However if the bulk blob is small
// enough it may be already on the buffer buf_ we read
// last time. If it is, there is no need of initiating
// another async op otherwise we have to read the
// missing bytes.
if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) {
start_ = 0;
auto const s = std::ssize(*buf_);
auto const l = parser_.bulk_length();
buf_->resize(l + 2);
net::async_read(
stream_,
net::buffer(buf_->data() + s, l + 2 - s),
net::transfer_all(),
std::move(self));
return;
}
default:
{
// The condition below is wrong. it must be n < 3 for case 1
// and n < 2 for the async_read.
if (ec || n < 3)
return self.complete(ec);
n = parser_.advance(buf_->data(), n);
buf_->erase(0, n);
if (parser_.done())
return self.complete({});
}
}
}
}
};
template <class SyncReadStream, class Response>
auto read(
SyncReadStream& stream,
resp::buffer& buf,
Response& res,
boost::system::error_code& ec)
{
parser<Response> p {&res};
std::size_t n = 0;
do {
if (p.bulk() == parser<Response>::bulk::none) {
n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec);
if (ec || n < 3)
return n;
} else {
auto const s = std::ssize(buf);
auto const l = p.bulk_length();
if (s < (l + 2)) {
buf.resize(l + 2);
n = net::read(stream, net::buffer(buf.data() + s, l + 2 - s));
if (ec || n < 2)
return n;
}
}
n = p.advance(buf.data(), n);
buf.erase(0, n);
} while (!p.done());
return n;
}
template<class SyncReadStream, class Response>
std::size_t
read(
SyncReadStream& stream,
resp::buffer& buf,
Response& res)
{
boost::system::error_code ec;
auto const n = read(stream, buf, res, ec);
if (ec)
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
return n;
}
template <
class AsyncReadStream,
class Response,
class CompletionToken =
net::default_completion_token_t<typename AsyncReadStream::executor_type>
>
auto async_read(
AsyncReadStream& stream,
resp::buffer& buffer,
Response& res,
CompletionToken&& token =
net::default_completion_token_t<typename AsyncReadStream::executor_type>{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(parse_op<AsyncReadStream, Response> {stream, &buffer, &res},
token,
stream);
}
} // resp
} // aedis
#include <aedis/resp.hpp>
#include <aedis/pipeline.hpp>
#include <aedis/response.hpp>

View File

@@ -292,14 +292,13 @@ public:
events.push(e);
}
// TODO: Remove this, use ranges instead.
template <class Key, class T, class Compare, class Allocator>
template <class Range>
auto
hset(std::string const& key,
std::map<Key, T, Compare, Allocator> const& m,
Event e = Event::ignore)
hset(std::string const& key, Range const& r, Event e = Event::ignore)
{
resp::assemble(payload, "HSET", {key}, std::cbegin(m), std::cend(m), 2);
using std::cbegin;
using std::cend;
resp::assemble(payload, "HSET", {key}, std::cbegin(r), std::cend(r), 2);
events.push(e);
}
@@ -377,14 +376,13 @@ public:
events.push(e);
}
// TODO: Remove this overload and use ranges.
template <class Key, class T, class Compare, class Allocator>
template <class Range>
auto
zadd(std::initializer_list<std::string> key,
std::map<Key, T, Compare, Allocator> const& m,
Range const& r,
Event e = Event::ignore)
{
resp::assemble(payload, "ZADD", key, std::cbegin(m), std::cend(m), 2);
resp::assemble(payload, "ZADD", key, std::cbegin(r), std::cend(r), 2);
events.push(e);
}

394
include/aedis/resp.hpp Normal file
View File

@@ -0,0 +1,394 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <string>
#include <cstdio>
#include <utility>
#include <cstdlib>
#include <cstring>
#include <numeric>
#include <iostream>
#include <algorithm>
#include <functional>
#include <type_traits>
#include <string_view>
#include <charconv>
#include <boost/asio.hpp>
namespace aedis
{
namespace net = boost::asio;
namespace ip = net::ip;
using tcp = ip::tcp;
namespace resp
{
// Converts a decimal number in ascii format to an integer.
inline
std::size_t length(char const* p)
{
std::size_t len = 0;
while (*p != '\r') {
len = (10 * len) + (*p - '0');
p++;
}
return len;
}
template <class Response>
class parser {
public:
enum class bulk
{ blob_error
, verbatim_string
, blob_string
, streamed_string_part
, none
};
private:
Response* res_ = nullptr;
int depth_ = 0;
int sizes_[6] = {2, 1, 1, 1, 1, 1}; // Streaming will require a bigger integer.
bulk bulk_ = bulk::none;
int bulk_length_ = std::numeric_limits<int>::max();
auto on_array_impl(char const* data, int m = 1)
{
auto const l = length(data + 1);
if (l == 0) {
--sizes_[depth_];
return l;
}
auto const size = m * l;
sizes_[++depth_] = size;
return size;
}
void on_array(char const* data)
{ res_->select_array(on_array_impl(data, 1)); }
void on_push(char const* data)
{ res_->select_push(on_array_impl(data, 1)); }
void on_set(char const* data)
{ res_->select_set(on_array_impl(data, 1)); }
void on_map(char const* data)
{ res_->select_map(on_array_impl(data, 2)); }
void on_attribute(char const* data)
{ res_->select_attribute(on_array_impl(data, 2)); }
void on_null()
{ res_->on_null(); --sizes_[depth_]; }
auto handle_simple_string(char const* data, std::size_t n)
{
--sizes_[depth_];
return std::string_view {data + 1, n - 3};
}
void on_simple_string(char const* data, std::size_t n)
{ res_->on_simple_string(handle_simple_string(data, n)); }
void on_simple_error(char const* data, std::size_t n)
{ res_->on_simple_error(handle_simple_string(data, n)); }
void on_number(char const* data, std::size_t n)
{ res_->on_number(handle_simple_string(data, n)); }
void on_double(char const* data, std::size_t n)
{ res_->on_double(handle_simple_string(data, n)); }
void on_boolean(char const* data, std::size_t n)
{ res_->on_bool(handle_simple_string(data, n)); }
void on_big_number(char const* data, std::size_t n)
{ res_->on_big_number(handle_simple_string(data, n)); }
void on_bulk(bulk b, std::string_view s = {})
{
switch (b) {
case bulk::blob_error: res_->on_blob_error(s); break;
case bulk::verbatim_string: res_->on_verbatim_string(s); break;
case bulk::blob_string: res_->on_blob_string(s); break;
case bulk::streamed_string_part:
{
if (std::empty(s)) {
sizes_[depth_] = 1;
} else {
res_->on_streamed_string_part(s);
}
} break;
default: assert(false);
}
--sizes_[depth_];
}
auto on_blob_error_impl(char const* data, bulk b)
{
auto const l = length(data + 1);
if (l == -1 || l == 0) {
on_bulk(b);
return bulk::none;
}
bulk_length_ = l;
return b;
}
auto on_streamed_string_size(char const* data)
{ return on_blob_error_impl(data, bulk::streamed_string_part); }
auto on_blob_error(char const* data)
{ return on_blob_error_impl(data, bulk::blob_error); }
auto on_verbatim_string(char const* data)
{ return on_blob_error_impl(data, bulk::verbatim_string); }
auto on_blob_string(char const* data)
{
if (*(data + 1) == '?') {
sizes_[++depth_] = std::numeric_limits<int>::max();
return bulk::none;
}
return on_blob_error_impl(data, bulk::blob_string);
}
public:
parser(Response* res)
: res_ {res}
{}
std::size_t advance(char const* data, std::size_t n)
{
auto next = bulk::none;
if (bulk_ != bulk::none) {
n = bulk_length_ + 2;
on_bulk(bulk_, {data, (std::size_t)bulk_length_});
} else {
if (sizes_[depth_] != 0) {
switch (*data) {
case '!': next = on_blob_error(data); break;
case '=': next = on_verbatim_string(data); break;
case '$': next = on_blob_string(data); break;
case ';': next = on_streamed_string_size(data); break;
case '-': on_simple_error(data, n); break;
case ':': on_number(data, n); break;
case ',': on_double(data, n); break;
case '#': on_boolean(data, n); break;
case '(': on_big_number(data, n); break;
case '+': on_simple_string(data, n); break;
case '_': on_null(); break;
case '>': on_push(data); break;
case '~': on_set(data); break;
case '*': on_array(data); break;
case '|': on_attribute(data); break;
case '%': on_map(data); break;
default: assert(false);
}
}
}
while (sizes_[depth_] == 0)
--sizes_[--depth_];
bulk_ = next;
return n;
}
auto done() const noexcept
{ return depth_ == 0 && bulk_ == bulk::none; }
auto bulk() const noexcept
{ return bulk_; }
auto bulk_length() const noexcept
{ return bulk_length_; }
};
void print_command_raw(std::string const& data, int n)
{
for (int i = 0; i < n; ++i) {
if (data[i] == '\n') {
std::cout << "\\n";
continue;
}
if (data[i] == '\r') {
std::cout << "\\r";
continue;
}
std::cout << data[i];
}
}
// The parser supports up to 5 levels of nested structures. The first
// element in the sizes stack is a sentinel and must be different from
// 1.
template <
class AsyncReadStream,
class Storage,
class Response>
class parse_op {
private:
AsyncReadStream& stream_;
Storage* buf_ = nullptr;
parser<Response> parser_;
int start_ = 1;
public:
parse_op(AsyncReadStream& stream, Storage* buf, Response* res)
: stream_ {stream}
, buf_ {buf}
, parser_ {res}
{ }
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
switch (start_) {
for (;;) {
if (parser_.bulk() == parser<Response>::bulk::none) {
case 1:
start_ = 0;
net::async_read_until(
stream_,
net::dynamic_buffer(*buf_),
"\r\n",
std::move(self));
return;
}
// On a bulk read we can't read until delimiter since the
// payload may contain the delimiter itself so we have to
// read the whole chunk. However if the bulk blob is small
// enough it may be already on the buffer buf_ we read
// last time. If it is, there is no need of initiating
// another async op otherwise we have to read the
// missing bytes.
if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) {
start_ = 0;
auto const s = std::ssize(*buf_);
auto const l = parser_.bulk_length();
buf_->resize(l + 2);
net::async_read(
stream_,
net::buffer(buf_->data() + s, l + 2 - s),
net::transfer_all(),
std::move(self));
return;
}
default:
{
// The condition below is wrong. it must be n < 3 for case 1
// and n < 2 for the async_read.
if (ec || n < 3)
return self.complete(ec);
n = parser_.advance(buf_->data(), n);
buf_->erase(0, n);
if (parser_.done())
return self.complete({});
}
}
}
}
};
template <
class SyncReadStream,
class Storage,
class Response>
auto read(
SyncReadStream& stream,
Storage& buf,
Response& res,
boost::system::error_code& ec)
{
parser<Response> p {&res};
std::size_t n = 0;
do {
if (p.bulk() == parser<Response>::bulk::none) {
n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec);
if (ec || n < 3)
return n;
} else {
auto const s = std::ssize(buf);
auto const l = p.bulk_length();
if (s < (l + 2)) {
buf.resize(l + 2);
n = net::read(stream, net::buffer(buf.data() + s, l + 2 - s));
if (ec || n < 2)
return n;
}
}
n = p.advance(buf.data(), n);
buf.erase(0, n);
} while (!p.done());
return n;
}
template<
class SyncReadStream,
class Storage,
class Response>
std::size_t
read(
SyncReadStream& stream,
Storage& buf,
Response& res)
{
boost::system::error_code ec;
auto const n = read(stream, buf, res, ec);
if (ec)
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
return n;
}
template <
class AsyncReadStream,
class Storage,
class Response,
class CompletionToken =
net::default_completion_token_t<typename AsyncReadStream::executor_type>
>
auto async_read(
AsyncReadStream& stream,
Storage& buffer,
Response& res,
CompletionToken&& token =
net::default_completion_token_t<typename AsyncReadStream::executor_type>{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(parse_op<AsyncReadStream, Storage, Response> {stream, &buffer, &res},
token,
stream);
}
} // resp
} // aedis

174
include/aedis/response.hpp Normal file
View File

@@ -0,0 +1,174 @@
/* Copyright (c) 2019 - 2020 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <set>
#include <list>
#include <vector>
#include <string>
#include <numeric>
#include <type_traits>
#include <string_view>
#include <charconv>
template <class Iter>
void print(Iter begin, Iter end)
{
for (; begin != end; ++begin)
std::cout << *begin << " ";
std::cout << std::endl;
}
template <class Range>
void print(Range const& v)
{
using std::cbegin;
using std::cend;
print(cbegin(v), cend(v));
}
namespace aedis { namespace resp {
struct response_noop {
virtual void select_array(int n) {}
virtual void select_push(int n) {}
virtual void select_set(int n) {}
virtual void select_map(int n) {}
virtual void select_attribute(int n) {}
virtual void on_simple_string(std::string_view s) {}
virtual void on_simple_error(std::string_view s) {}
virtual void on_number(std::string_view s) {}
virtual void on_double(std::string_view s) {}
virtual void on_bool(std::string_view s) {}
virtual void on_big_number(std::string_view s) {}
virtual void on_null() {}
virtual void on_blob_error(std::string_view s = {}) {}
virtual void on_verbatim_string(std::string_view s = {}) {}
virtual void on_blob_string(std::string_view s = {}) {}
virtual void on_streamed_string_part(std::string_view s = {}) {}
};
using response = response_noop;
struct response_throw {
virtual void select_array(int n) { throw std::runtime_error("select_array: Has not been overridden."); }
virtual void select_push(int n) { throw std::runtime_error("select_push: Has not been overridden."); }
virtual void select_set(int n) { throw std::runtime_error("select_set: Has not been overridden."); }
virtual void select_map(int n) { throw std::runtime_error("select_map: Has not been overridden."); }
virtual void select_attribute(int n) { throw std::runtime_error("select_attribute: Has not been overridden."); }
virtual void on_simple_string(std::string_view s) { throw std::runtime_error("on_simple_string: Has not been overridden."); }
virtual void on_simple_error(std::string_view s) { throw std::runtime_error("on_simple_error: Has not been overridden."); }
virtual void on_number(std::string_view s) { throw std::runtime_error("on_number: Has not been overridden."); }
virtual void on_double(std::string_view s) { throw std::runtime_error("on_double: Has not been overridden."); }
virtual void on_bool(std::string_view s) { throw std::runtime_error("on_bool: Has not been overridden."); }
virtual void on_big_number(std::string_view s) { throw std::runtime_error("on_big_number: Has not been overridden."); }
virtual void on_null() { throw std::runtime_error("on_null: Has not been overridden."); }
virtual void on_blob_error(std::string_view s = {}) { throw std::runtime_error("on_blob_error: Has not been overridden."); }
virtual void on_verbatim_string(std::string_view s = {}) { throw std::runtime_error("on_verbatim_string: Has not been overridden."); }
virtual void on_blob_string(std::string_view s = {}) { throw std::runtime_error("on_blob_string: Has not been overridden."); }
virtual void on_streamed_string_part(std::string_view s = {}) { throw std::runtime_error("on_streamed_string_part: Has not been overridden."); }
};
template <class T>
std::enable_if<std::is_integral<T>::value, void>::type
from_string_view(std::string_view s, T& n)
{
auto r = std::from_chars(s.data(), s.data() + s.size(), n);
if (r.ec == std::errc::invalid_argument)
throw std::runtime_error("from_chars: Unable to convert");
}
void from_string_view(std::string_view s, std::string& r)
{ r = s; }
template <class T, class Allocator = std::allocator<T>>
struct response_list : response_throw {
std::list<T, Allocator> result;
void select_array(int n) override { }
void on_blob_string(std::string_view s) override
{
T r;
from_string_view(s, r);
result.push_back(std::move(r));
}
};
template <class T>
struct response_int : response_throw {
T result;
void on_number(std::string_view s) override
{ from_string_view(s, result); }
};
template<
class CharT,
class Traits = std::char_traits<CharT>,
class Allocator = std::allocator<CharT>>
struct response_basic_string : response_throw {
std::basic_string<CharT, Traits, Allocator> result;
void on_simple_string(std::string_view s)
{ from_string_view(s, result); }
};
using response_string = response_basic_string<char>;
template<
class Key,
class Compare = std::less<Key>,
class Allocator = std::allocator<Key>>
struct response_set : response_throw {
std::set<Key, Compare, Allocator> result;
void select_array(int n) override { }
void select_set(int n) override { }
void on_blob_string(std::string_view s) override
{
Key r;
from_string_view(s, r);
result.insert(std::end(result), std::move(r));
}
};
template <class T>
struct response_vector : response_throw {
private:
void add(std::string_view s = {})
{
T r;
from_string_view(s, r);
result.emplace_back(std::move(r));
}
public:
std::vector<T> result;
void clear() { result.clear(); }
auto size() const noexcept { return std::size(result); }
void select_array(int n) override { }
void select_push(int n) override { }
void select_set(int n) override { }
void select_map(int n) override { }
void select_attribute(int n) override { }
void on_simple_string(std::string_view s) override { add(s); }
void on_simple_error(std::string_view s) override { add(s); }
void on_number(std::string_view s) override { add(s); }
void on_double(std::string_view s) override { add(s); }
void on_bool(std::string_view s) override { add(s); }
void on_big_number(std::string_view s) override { add(s); }
void on_null() override { add(); }
void on_blob_error(std::string_view s = {}) override { add(s); }
void on_verbatim_string(std::string_view s = {}) override { add(s); }
void on_blob_string(std::string_view s = {}) override { add(s); }
void on_streamed_string_part(std::string_view s = {}) override { add(s); }
};
} // resp
} // aedis

View File

@@ -90,7 +90,7 @@ net::awaitable<void> test_list(int version)
co_await async_write(socket, net::buffer(p.payload));
resp::buffer buffer;
std::string buffer;
for (auto const& o : r) {
resp::response_vector<std::string> res;
co_await resp::async_read(socket, buffer, res);
@@ -155,7 +155,7 @@ net::awaitable<void> test_set(int version)
co_await async_write(socket, net::buffer(p.payload));
resp::buffer buffer;
std::string buffer;
for (auto const& o : r) {
resp::response_vector<std::string> res;
co_await resp::async_read(socket, buffer, res);
@@ -213,7 +213,7 @@ net::awaitable<void> offline()
//, {{bulk}, {test_bulk}}
};
resp::buffer buffer;
std::string buffer;
for (auto const& e : payloads) {
test_tcp_socket ts {e.first};
resp::response_vector<std::string> res;