2
0
mirror of https://github.com/boostorg/redis.git synced 2026-02-15 01:12:23 +00:00

Refactoring listed below:

- Improvements in the organization.
- Adds support for the default token on the consumer.
This commit is contained in:
Marcelo Zimbres
2021-10-10 15:00:22 +02:00
parent 62e34767ca
commit 05aff2f3c6
18 changed files with 540 additions and 439 deletions

View File

@@ -10,4 +10,5 @@
#include <aedis/version.hpp>
#include <aedis/resp3/write.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/resp3/read.hpp>
#include <aedis/resp3/consumer.hpp>
#include <aedis/resp3/response.hpp>

View File

@@ -11,7 +11,7 @@
namespace aedis {
/// Supported redis commands.
/// List of the supported redis commands.
enum class command
{ acl_load
, acl_save
@@ -75,7 +75,12 @@ enum class command
, unknown
};
/// Converts the command to a string.
std::string to_string(command c);
/** Writes the text representation of the command to the output
* stream.
*/
std::ostream& operator<<(std::ostream& os, command c);
} // aedis

View File

@@ -0,0 +1,48 @@
/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <aedis/net.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/response.hpp>
#include <aedis/resp3/detail/read.hpp>
namespace aedis {
namespace resp3 {
/** Reads and writes redis commands.
*/
struct consumer {
std::string buffer;
net::coroutine coro = net::coroutine();
type t = type::invalid;
template<
class AsyncReadWriteStream,
class CompletionToken =
net::default_completion_token_t<typename AsyncReadWriteStream::executor_type>
>
auto async_consume(
AsyncReadWriteStream& stream,
std::queue<request>& requests,
response& resp,
CompletionToken&& token =
net::default_completion_token_t<typename AsyncReadWriteStream::executor_type>{})
{
return net::async_compose<
CompletionToken,
void(boost::system::error_code, type)>(
detail::consumer_op{stream, buffer, requests, resp, t, coro},
token, stream);
}
};
} // resp3
} // aedis

View File

@@ -0,0 +1,112 @@
/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <queue>
#include <vector>
#include <string>
#include <numeric>
#include <algorithm>
#include <functional>
#include <type_traits>
#include <string_view>
#include <utility>
#include <aedis/command.hpp>
namespace aedis {
namespace resp3 {
namespace detail {
void add_bulk(std::string& to, std::string_view param);
void add_header(std::string& to, int size);
struct accumulator {
auto
operator()(
std::string a,
std::string_view b) const
{
add_bulk(a, b);
return a;
}
template <class T>
auto
operator()(
std::string a,
T b,
typename std::enable_if<(std::is_integral<T>::value || std::is_floating_point<T>::value), bool>::type = false) const
{
auto const v = std::to_string(b);
add_bulk(a, v);
return a;
}
auto
operator()(
std::string a,
std::pair<std::string, std::string_view> b) const
{
add_bulk(a, b.first);
add_bulk(a, b.second);
return a;
}
template <class T>
auto
operator()(
std::string a,
std::pair<T, std::string_view> b,
typename std::enable_if<(std::is_integral<T>::value || std::is_floating_point<T>::value), bool>::type = false) const
{
auto const v = std::to_string(b.first);
add_bulk(a, v);
add_bulk(a, b.second);
return a;
}
};
void assemble(std::string& ret, std::string_view cmd);
template <class Iter>
void assemble( std::string& ret
, std::string_view cmd
, std::initializer_list<std::string_view> key
, Iter begin
, Iter end
, int size = 1)
{
auto const d1 =
std::distance( std::cbegin(key)
, std::cend(key));
auto const d2 = std::distance(begin, end);
std::string a;
add_header(a, 1 + d1 + size * d2);
add_bulk(a, cmd);
auto b =
std::accumulate( std::cbegin(key)
, std::cend(key)
, std::move(a)
, accumulator{});
ret +=
std::accumulate( begin
, end
, std::move(b)
, accumulator{});
}
void assemble(std::string& ret, std::string_view cmd, std::string_view key);
} // detail
} // resp3
} // aedis

View File

@@ -0,0 +1,44 @@
/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <aedis/command.hpp>
namespace aedis {
namespace resp3 {
namespace detail {
void add_bulk(std::string& to, std::string_view param)
{
to += "$";
to += std::to_string(std::size(param));
to += "\r\n";
to += param;
to += "\r\n";
}
void add_header(std::string& to, int size)
{
to += "*";
to += std::to_string(size);
to += "\r\n";
}
void assemble(std::string& ret, std::string_view cmd)
{
add_header(ret, 1);
add_bulk(ret, cmd);
}
void assemble(std::string& ret, std::string_view cmd, std::string_view key)
{
std::initializer_list<std::string_view> dummy;
assemble(ret, cmd, {key}, std::cbegin(dummy), std::cend(dummy));
}
} // detail
} // resp3
} // aedis

View File

@@ -10,7 +10,9 @@
#include <string_view>
#include <aedis/resp3/response_adapter_base.hpp>
namespace aedis { namespace resp3 { namespace detail {
namespace aedis {
namespace resp3 {
namespace detail {
// resp3 parser.
class parser {
@@ -48,6 +50,79 @@ public:
auto bulk_length() const noexcept { return bulk_length_; }
};
// The parser supports up to 5 levels of nested structures. The first
// element in the sizes stack is a sentinel and must be different from
// 1.
template <class AsyncReadStream, class Storage>
class parse_op {
private:
AsyncReadStream& stream_;
Storage* buf_ = nullptr;
detail::parser parser_;
int start_ = 1;
public:
parse_op(AsyncReadStream& stream, Storage* buf, response_adapter_base* res)
: stream_ {stream}
, buf_ {buf}
, parser_ {res}
{ }
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
switch (start_) {
for (;;) {
if (parser_.bulk() == detail::parser::bulk_type::none) {
case 1:
start_ = 0;
net::async_read_until(
stream_,
net::dynamic_buffer(*buf_),
"\r\n",
std::move(self));
return;
}
// On a bulk read we can't read until delimiter since the
// payload may contain the delimiter itself so we have to
// read the whole chunk. However if the bulk blob is small
// enough it may be already on the buffer buf_ we read
// last time. If it is, there is no need of initiating
// another async op otherwise we have to read the
// missing bytes.
if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) {
start_ = 0;
auto const s = std::ssize(*buf_);
auto const l = parser_.bulk_length();
auto const to_read = static_cast<std::size_t>(l + 2 - s);
buf_->resize(l + 2);
net::async_read(
stream_,
net::buffer(buf_->data() + s, to_read),
net::transfer_all(),
std::move(self));
return;
}
default:
{
if (ec)
return self.complete(ec);
n = parser_.advance(buf_->data(), n);
buf_->erase(0, n);
if (parser_.done())
return self.complete({});
}
}
}
}
};
} // detail
} // resp3
} // aedis

View File

@@ -7,93 +7,20 @@
#pragma once
#include <iostream>
#include <aedis/net.hpp>
#include <aedis/resp3/write.hpp>
#include <aedis/resp3/request.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/response.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/response_adapter_base.hpp>
#include <aedis/resp3/detail/parser.hpp>
#include <aedis/resp3/detail/write.hpp>
#include <boost/asio/yield.hpp>
namespace aedis { namespace resp3 {
// The parser supports up to 5 levels of nested structures. The first
// element in the sizes stack is a sentinel and must be different from
// 1.
template <class AsyncReadStream, class Storage>
class parse_op {
private:
AsyncReadStream& stream_;
Storage* buf_ = nullptr;
detail::parser parser_;
int start_ = 1;
public:
parse_op(AsyncReadStream& stream, Storage* buf, response_adapter_base* res)
: stream_ {stream}
, buf_ {buf}
, parser_ {res}
{ }
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
switch (start_) {
for (;;) {
if (parser_.bulk() == detail::parser::bulk_type::none) {
case 1:
start_ = 0;
net::async_read_until(
stream_,
net::dynamic_buffer(*buf_),
"\r\n",
std::move(self));
return;
}
// On a bulk read we can't read until delimiter since the
// payload may contain the delimiter itself so we have to
// read the whole chunk. However if the bulk blob is small
// enough it may be already on the buffer buf_ we read
// last time. If it is, there is no need of initiating
// another async op otherwise we have to read the
// missing bytes.
if (std::ssize(*buf_) < (parser_.bulk_length() + 2)) {
start_ = 0;
auto const s = std::ssize(*buf_);
auto const l = parser_.bulk_length();
auto const to_read = static_cast<std::size_t>(l + 2 - s);
buf_->resize(l + 2);
net::async_read(
stream_,
net::buffer(buf_->data() + s, to_read),
net::transfer_all(),
std::move(self));
return;
}
default:
{
if (ec)
return self.complete(ec);
n = parser_.advance(buf_->data(), n);
buf_->erase(0, n);
if (parser_.done())
return self.complete({});
}
}
}
}
};
namespace aedis {
namespace resp3 {
namespace detail {
template <class SyncReadStream, class Storage>
auto read(
@@ -102,10 +29,10 @@ auto read(
response_adapter_base& res,
boost::system::error_code& ec)
{
detail::parser p {&res};
parser p {&res};
std::size_t n = 0;
do {
if (p.bulk() == detail::parser::bulk_type::none) {
if (p.bulk() == parser::bulk_type::none) {
n = net::read_until(stream, net::dynamic_buffer(buf), "\r\n", ec);
if (ec || n < 3)
return n;
@@ -226,8 +153,9 @@ auto async_read_type(
>(type_op<AsyncReadStream, Storage> {stream, &buffer}, token, stream);
}
template <class AsyncReadWriteStream>
struct consumer_op {
net::ip::tcp::socket& socket;
AsyncReadWriteStream& stream;
std::string& buffer;
std::queue<request>& requests;
response& resp;
@@ -242,7 +170,7 @@ struct consumer_op {
{
reenter (coro) for (;;)
{
yield async_write_some(socket, requests, std::move(self));
yield async_write_some(stream, requests, std::move(self));
if (ec) {
self.complete(ec, type::invalid);
return;
@@ -250,7 +178,7 @@ struct consumer_op {
do {
do {
yield async_read_type(socket, buffer, std::move(self));
yield async_read_type(stream, buffer, std::move(self));
if (ec) {
self.complete(ec, type::invalid);
return;
@@ -262,11 +190,11 @@ struct consumer_op {
{
if (m_type == type::push) {
auto* adapter = resp.select_adapter(m_type, command::unknown, {});
async_read_one(socket, buffer, *adapter, std::move(self));
async_read_one(stream, buffer, *adapter, std::move(self));
} else {
auto const& pair = requests.front().ids.front();
auto* adapter = resp.select_adapter(m_type, pair.first, pair.second);
async_read_one(socket, buffer, *adapter, std::move(self));
async_read_one(stream, buffer, *adapter, std::move(self));
}
}
@@ -287,26 +215,7 @@ struct consumer_op {
}
};
struct consumer {
std::string buffer;
response resp;
net::coroutine coro = net::coroutine();
type t = type::invalid;
template<class CompletionToken>
auto async_consume(
net::ip::tcp::socket& socket,
std::queue<request>& requests,
response& resp,
CompletionToken&& token)
{
return net::async_compose<
CompletionToken,
void(boost::system::error_code, type)>(
consumer_op{socket, buffer, requests, resp, t, coro}, token, socket);
}
};
} // detail
} // resp3
} // aedis

View File

@@ -0,0 +1,114 @@
/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#pragma once
#include <chrono>
#include <aedis/net.hpp>
#include <aedis/resp3/request.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/asio/yield.hpp>
namespace aedis {
namespace resp3 {
namespace detail {
template<class SyncWriteStream>
std::size_t
write(
SyncWriteStream& stream,
request& req,
boost::system::error_code& ec)
{
static_assert(boost::beast::is_sync_write_stream<SyncWriteStream>::value,
"SyncWriteStream type requirements not met");
return write(stream, net::buffer(req.payload), ec);
}
template<class SyncWriteStream>
std::size_t write(
SyncWriteStream& stream,
request& req)
{
static_assert(boost::beast::is_sync_write_stream<SyncWriteStream>::value,
"SyncWriteStream type requirements not met");
boost::system::error_code ec;
auto const bytes_transferred = write(stream, req, ec);
if (ec)
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
return bytes_transferred;
}
/** Asynchronously writes one or more requests on the stream.
*/
template<class AsyncWriteStream>
struct write_some_op {
AsyncWriteStream& stream;
std::queue<request>& requests;
net::coroutine coro = net::coroutine();
void
operator()(
auto& self,
boost::system::error_code const& ec = {},
std::size_t n = 0)
{
reenter (coro) {
do {
assert(!std::empty(requests));
assert(!std::empty(requests.front().payload));
yield async_write(
stream,
net::buffer(requests.front().payload),
std::move(self));
if (ec)
break;
requests.front().sent = true;
if (std::empty(requests.front().ids)) {
// We only pop when all commands in the pipeline has push
// responses like subscribe, otherwise, pop is done when the
// response arrives.
requests.pop();
}
} while (!std::empty(requests) && std::empty(requests.front().ids));
self.complete(ec);
}
}
};
template<
class AsyncWriteStream,
class CompletionToken>
auto
async_write_some(
AsyncWriteStream& stream,
std::queue<request>& requests,
CompletionToken&& token)
{
return net::async_compose<
CompletionToken,
void(boost::system::error_code)>(
write_some_op{stream, requests},
token, stream);
}
} // detail
} // resp3
} // aedis
#include <boost/asio/unyield.hpp>

View File

@@ -1,41 +0,0 @@
/* Copyright (c) 2019 - 2021 Marcelo Zimbres Silva (mzimbres at gmail dot com)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include <aedis/command.hpp>
namespace aedis { namespace resp3 {
void add_bulk(std::string& to, std::string_view param)
{
to += "$";
to += std::to_string(std::size(param));
to += "\r\n";
to += param;
to += "\r\n";
}
void add_header(std::string& to, int size)
{
to += "*";
to += std::to_string(size);
to += "\r\n";
}
void assemble(std::string& ret, std::string_view cmd)
{
add_header(ret, 1);
add_bulk(ret, cmd);
}
void assemble(std::string& ret, std::string_view cmd, std::string_view key)
{
std::initializer_list<std::string_view> dummy;
assemble(ret, cmd, {key}, std::cbegin(dummy), std::cend(dummy));
}
} // resp3
} // aedis

View File

@@ -7,7 +7,8 @@
#include <aedis/resp3/write.hpp>
namespace aedis { namespace resp3 {
namespace aedis {
namespace resp3 {
bool prepare_next(std::queue<request>& reqs)
{

View File

@@ -34,7 +34,7 @@ bool operator==(node const& a, node const& b);
/** Writes the text representation of node to the output stream.
*
* NOTE: Be careful when printing binary data.
* NOTE: Bonary data is not converted to text.
*/
std::ostream& operator<<(std::ostream& os, node const& o);

View File

@@ -10,103 +10,15 @@
#include <queue>
#include <vector>
#include <string>
#include <numeric>
#include <algorithm>
#include <functional>
#include <type_traits>
#include <string_view>
#include <utility>
#include <aedis/command.hpp>
#include <aedis/resp3/detail/assemble.hpp>
namespace aedis {
namespace resp3 {
// TODO: Move to detail.
void add_bulk(std::string& to, std::string_view param);
void add_header(std::string& to, int size);
struct accumulator {
auto
operator()(
std::string a,
std::string_view b) const
{
add_bulk(a, b);
return a;
}
template <class T>
auto
operator()(
std::string a,
T b,
typename std::enable_if<(std::is_integral<T>::value || std::is_floating_point<T>::value), bool>::type = false) const
{
auto const v = std::to_string(b);
add_bulk(a, v);
return a;
}
auto
operator()(
std::string a,
std::pair<std::string, std::string_view> b) const
{
add_bulk(a, b.first);
add_bulk(a, b.second);
return a;
}
template <class T>
auto
operator()(
std::string a,
std::pair<T, std::string_view> b,
typename std::enable_if<(std::is_integral<T>::value || std::is_floating_point<T>::value), bool>::type = false) const
{
auto const v = std::to_string(b.first);
add_bulk(a, v);
add_bulk(a, b.second);
return a;
}
};
void assemble(std::string& ret, std::string_view cmd);
template <class Iter>
void assemble( std::string& ret
, std::string_view cmd
, std::initializer_list<std::string_view> key
, Iter begin
, Iter end
, int size = 1)
{
auto const d1 =
std::distance( std::cbegin(key)
, std::cend(key));
auto const d2 = std::distance(begin, end);
std::string a;
add_header(a, 1 + d1 + size * d2);
add_bulk(a, cmd);
auto b =
std::accumulate( std::cbegin(key)
, std::cend(key)
, std::move(a)
, accumulator{});
ret +=
std::accumulate( begin
, end
, std::move(b)
, accumulator{});
}
void assemble(std::string& ret, std::string_view cmd, std::string_view key);
/** A class to compose redis requests
*
* A request is composed of one or more redis commands and is
@@ -140,66 +52,66 @@ public:
void ping()
{
assemble(payload, "PING");
detail::assemble(payload, "PING");
ids.push(std::make_pair(command::ping, std::string{}));
}
void quit()
{
assemble(payload, "QUIT");
detail::assemble(payload, "QUIT");
ids.push(std::make_pair(command::quit, std::string{}));
}
void multi()
{
assemble(payload, "MULTI");
detail::assemble(payload, "MULTI");
ids.push(std::make_pair(command::multi, std::string{}));
}
void exec()
{
assemble(payload, "EXEC");
detail::assemble(payload, "EXEC");
ids.push(std::make_pair(command::exec, std::string{}));
}
void incr(std::string_view key)
{
assemble(payload, "INCR", key);
detail::assemble(payload, "INCR", key);
ids.push(std::make_pair(command::incr, std::string{key}));
}
/// Adds auth to the request, see https://redis.io/commands/bgrewriteaof
void auth(std::string_view pwd)
{
assemble(payload, "AUTH", pwd);
detail::assemble(payload, "AUTH", pwd);
ids.push(std::make_pair(command::auth, std::string{}));
}
/// Adds bgrewriteaof to the request, see https://redis.io/commands/bgrewriteaof
void bgrewriteaof()
{
assemble(payload, "BGREWRITEAOF");
detail::assemble(payload, "BGREWRITEAOF");
ids.push(std::make_pair(command::bgrewriteaof, std::string{}));
}
/// Adds role to the request, see https://redis.io/commands/role
void role()
{
assemble(payload, "ROLE");
detail::assemble(payload, "ROLE");
ids.push(std::make_pair(command::role, std::string{}));
}
/// Adds bgsave to the request, see //https://redis.io/commands/bgsave
void bgsave()
{
assemble(payload, "BGSAVE");
detail::assemble(payload, "BGSAVE");
ids.push(std::make_pair(command::bgsave, std::string{}));
}
/// Adds ping to the request, see https://redis.io/commands/flushall
void flushall()
{
assemble(payload, "FLUSHALL");
detail::assemble(payload, "FLUSHALL");
ids.push(std::make_pair(command::flushall, std::string{}));
}
@@ -207,10 +119,10 @@ public:
void lpop(std::string_view key, int count = 1)
{
//if (count == 1) {
assemble(payload, "LPOP", key);
detail::assemble(payload, "LPOP", key);
//} else {
//auto par = {std::to_string(count)};
//assemble(
//detail::assemble(
// payload,
// "LPOP",
// {key},
@@ -225,34 +137,34 @@ public:
void subscribe(std::string_view key)
{
// The response to this command is a push.
assemble(payload, "SUBSCRIBE", key);
detail::assemble(payload, "SUBSCRIBE", key);
}
/// Adds ping to the request, see https://redis.io/commands/unsubscribe
void unsubscribe(std::string_view key)
{
// The response to this command is a push.
assemble(payload, "UNSUBSCRIBE", key);
detail::assemble(payload, "UNSUBSCRIBE", key);
}
/// Adds ping to the request, see https://redis.io/commands/get
void get(std::string_view key)
{
assemble(payload, "GET", key);
detail::assemble(payload, "GET", key);
ids.push(std::make_pair(command::get, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/keys
void keys(std::string_view pattern)
{
assemble(payload, "KEYS", pattern);
detail::assemble(payload, "KEYS", pattern);
ids.push(std::make_pair(command::keys, std::string{}));
}
/// Adds ping to the request, see https://redis.io/commands/hello
void hello(std::string_view version = "3")
{
assemble(payload, "HELLO", version);
detail::assemble(payload, "HELLO", version);
ids.push(std::make_pair(command::hello, std::string{}));
}
@@ -260,7 +172,7 @@ public:
void sentinel(std::string_view arg, std::string_view name)
{
auto par = {name};
assemble(payload, "SENTINEL", {arg}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "SENTINEL", {arg}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::sentinel, std::string{}));
}
@@ -268,7 +180,7 @@ public:
void append(std::string_view key, std::string_view msg)
{
auto par = {msg};
assemble(payload, "APPEND", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "APPEND", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::append, std::string{key}));
}
@@ -279,7 +191,7 @@ public:
auto const end_str = std::to_string(end);
std::initializer_list<std::string_view> par {start_str, end_str};
assemble( payload
detail::assemble( payload
, "BITCOUNT"
, {key}
, std::cbegin(par)
@@ -291,7 +203,7 @@ public:
template <class Iter>
void rpush(std::string_view key, Iter begin, Iter end)
{
assemble(payload, "RPUSH", {key}, begin, end);
detail::assemble(payload, "RPUSH", {key}, begin, end);
ids.push(std::make_pair(command::rpush, std::string{key}));
}
@@ -315,21 +227,21 @@ public:
template <class Iter>
void lpush(std::string_view key, Iter begin, Iter end)
{
assemble(payload, "LPUSH", {key}, begin, end);
detail::assemble(payload, "LPUSH", {key}, begin, end);
ids.push(std::make_pair(command::lpush, std::string{key}));
}
void psubscribe( std::initializer_list<std::string_view> l)
{
std::initializer_list<std::string_view> dummy = {};
assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy));
detail::assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy));
}
/// Adds ping to the request, see https://redis.io/commands/publish
void publish(std::string_view key, std::string_view msg)
{
auto par = {msg};
assemble(payload, "PUBLISH", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "PUBLISH", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::publish, std::string{key}));
}
@@ -337,7 +249,7 @@ public:
void set(std::string_view key,
std::initializer_list<std::string_view> args)
{
assemble(payload, "SET", {key}, std::cbegin(args), std::cend(args));
detail::assemble(payload, "SET", {key}, std::cbegin(args), std::cend(args));
ids.push(std::make_pair(command::set, std::string{key}));
}
@@ -350,7 +262,7 @@ public:
//error: ERR Protocol error: expected '$', got '*'
using std::cbegin;
using std::cend;
assemble(payload, "HSET", {key}, std::cbegin(r), std::cend(r), 2);
detail::assemble(payload, "HSET", {key}, std::cbegin(r), std::cend(r), 2);
ids.push(std::make_pair(command::hset, std::string{key}));
}
@@ -359,7 +271,7 @@ public:
{
auto by_str = std::to_string(by);
std::initializer_list<std::string_view> par {field, by_str};
assemble(payload, "HINCRBY", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "HINCRBY", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::hincrby, std::string{key}));
}
@@ -367,28 +279,28 @@ public:
void hkeys(std::string_view key)
{
auto par = {""};
assemble(payload, "HKEYS", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "HKEYS", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::hkeys, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/hlen
void hlen(std::string_view key)
{
assemble(payload, "HLEN", {key});
detail::assemble(payload, "HLEN", {key});
ids.push(std::make_pair(command::hlen, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/hgetall
void hgetall(std::string_view key)
{
assemble(payload, "HGETALL", {key});
detail::assemble(payload, "HGETALL", {key});
ids.push(std::make_pair(command::hgetall, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/hvals
void hvals( std::string_view key)
{
assemble(payload, "HVALS", {key});
detail::assemble(payload, "HVALS", {key});
ids.push(std::make_pair(command::hvals, std::string{key}));
}
@@ -396,7 +308,7 @@ public:
void hget(std::string_view key, std::string_view field)
{
auto par = {field};
assemble(payload, "HGET", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "HGET", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::hget, std::string{key}));
}
@@ -405,7 +317,7 @@ public:
std::string_view key,
std::initializer_list<std::string_view> fields)
{
assemble( payload
detail::assemble( payload
, "HMGET"
, {key}
, std::cbegin(fields)
@@ -419,7 +331,7 @@ public:
hdel(std::string_view key,
std::initializer_list<std::string_view> fields)
{
assemble(
detail::assemble(
payload,
"HDEL",
{key},
@@ -434,7 +346,7 @@ public:
{
auto const str = std::to_string(secs);
std::initializer_list<std::string_view> par {str};
assemble(payload, "EXPIRE", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "EXPIRE", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::expire, std::string{key}));
}
@@ -443,7 +355,7 @@ public:
{
auto const score_str = std::to_string(score);
std::initializer_list<std::string_view> par = {score_str, value};
assemble(payload, "ZADD", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "ZADD", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::zadd, std::string{key}));
}
@@ -451,7 +363,7 @@ public:
template <class Range>
void zadd(std::initializer_list<std::string_view> key, Range const& r)
{
assemble(payload, "ZADD", key, std::cbegin(r), std::cend(r), 2);
detail::assemble(payload, "ZADD", key, std::cbegin(r), std::cend(r), 2);
ids.push(std::make_pair(command::zadd, std::string{key}));
}
@@ -462,7 +374,7 @@ public:
auto const max_str = std::to_string(max);
std::initializer_list<std::string_view> par {min_str, max_str};
assemble(payload, "ZRANGE", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "ZRANGE", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::zrange, std::string{key}));
}
@@ -475,7 +387,7 @@ public:
auto const min_str = std::to_string(min);
auto par = {min_str , max_str};
assemble(payload, "ZRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "ZRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::zrangebyscore, std::string{key}));
}
@@ -487,7 +399,7 @@ public:
std::string_view max)
{
auto par = {min, max};
assemble(payload, "ZREMRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "ZREMRANGEBYSCORE", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::zremrangebyscore, std::string{key}));
}
@@ -497,7 +409,7 @@ public:
auto const min_str = std::to_string(min);
auto const max_str = std::to_string(max);
std::initializer_list<std::string_view> par {min_str, max_str};
assemble(payload, "LRANGE", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "LRANGE", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::lrange, std::string{key}));
}
@@ -507,7 +419,7 @@ public:
auto const min_str = std::to_string(min);
auto const max_str = std::to_string(max);
std::initializer_list<std::string_view> par {min_str, max_str};
assemble(payload, "LTRIM", {key}, std::cbegin(par), std::cend(par));
detail::assemble(payload, "LTRIM", {key}, std::cbegin(par), std::cend(par));
ids.push(std::make_pair(command::ltrim, std::string{key}));
}
@@ -515,14 +427,14 @@ public:
/// Adds ping to the request, see https://redis.io/commands/del
void del(std::string_view key)
{
assemble(payload, "DEL", key);
detail::assemble(payload, "DEL", key);
ids.push(std::make_pair(command::del, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/llen
void llen(std::string_view key)
{
assemble(payload, "LLEN", key);
detail::assemble(payload, "LLEN", key);
ids.push(std::make_pair(command::llen, std::string{key}));
}
@@ -530,7 +442,7 @@ public:
template <class Iter>
void sadd(std::string_view key, Iter begin, Iter end)
{
assemble(payload, "SADD", {key}, begin, end);
detail::assemble(payload, "SADD", {key}, begin, end);
ids.push(std::make_pair(command::sadd, std::string{key}));
}
@@ -546,28 +458,28 @@ public:
/// Adds ping to the request, see https://redis.io/commands/smembers
void smembers(std::string_view key)
{
assemble(payload, "SMEMBERS", key);
detail::assemble(payload, "SMEMBERS", key);
ids.push(std::make_pair(command::smembers, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/scard
void scard(std::string_view key)
{
assemble(payload, "SCARD", key);
detail::assemble(payload, "SCARD", key);
ids.push(std::make_pair(command::scard, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/scard
void scard(std::string_view key, std::initializer_list<std::string_view> l)
{
assemble(payload, "SDIFF", {key}, std::cbegin(l), std::cend(l));
detail::assemble(payload, "SDIFF", {key}, std::cbegin(l), std::cend(l));
ids.push(std::make_pair(command::sdiff, std::string{key}));
}
/// Adds ping to the request, see https://redis.io/commands/client_id
void client_id(std::string_view parameters)
{
assemble(payload, "CLIENT ID", {parameters});
detail::assemble(payload, "CLIENT ID", {parameters});
ids.push(std::make_pair(command::client_id, std::string{}));
}
};

View File

@@ -11,7 +11,8 @@
#include <vector>
#include <string>
namespace aedis { namespace resp3 {
namespace aedis {
namespace resp3 {
/// RESP3 message types.
enum class type
@@ -40,7 +41,7 @@ std::string to_string(type t);
// TODO: Move to detail?
type to_type(char c);
/// Writes the string representation of type to the stream.
/// Writes the string representation of type to the output stream.
std::ostream& operator<<(std::ostream& os, type t);
} // resp3

View File

@@ -15,97 +15,13 @@
#include <boost/beast/core/stream_traits.hpp>
#include <boost/asio/yield.hpp>
namespace aedis { namespace resp3 {
namespace aedis {
namespace resp3 {
bool prepare_next(std::queue<request>& reqs);
template<class SyncWriteStream>
std::size_t
write(
SyncWriteStream& stream,
request& req,
boost::system::error_code& ec)
{
static_assert(boost::beast::is_sync_write_stream<SyncWriteStream>::value,
"SyncWriteStream type requirements not met");
return write(stream, net::buffer(req.payload), ec);
}
template<class SyncWriteStream>
std::size_t write(
SyncWriteStream& stream,
request& req)
{
static_assert(boost::beast::is_sync_write_stream<SyncWriteStream>::value,
"SyncWriteStream type requirements not met");
boost::system::error_code ec;
auto const bytes_transferred = write(stream, req, ec);
if (ec)
BOOST_THROW_EXCEPTION(boost::system::system_error{ec});
return bytes_transferred;
}
/** Asynchronously writes one or more requests on the stream.
/** Prepares the back of a queue to receive further commands and
* returns true if a write is possible.
*/
template<class AsyncWriteStream>
struct write_some_op {
AsyncWriteStream& stream;
std::queue<request>& requests;
net::coroutine coro = net::coroutine();
void
operator()(
auto& self,
boost::system::error_code const& ec = {},
std::size_t n = 0)
{
reenter (coro) {
do {
assert(!std::empty(requests));
assert(!std::empty(requests.front().payload));
yield async_write(
stream,
net::buffer(requests.front().payload),
std::move(self));
if (ec)
break;
requests.front().sent = true;
if (std::empty(requests.front().ids)) {
// We only pop when all commands in the pipeline has push
// responses like subscribe, otherwise, pop is done when the
// response arrives.
requests.pop();
}
} while (!std::empty(requests) && std::empty(requests.front().ids));
self.complete(ec);
}
}
};
template<
class AsyncWriteStream,
class CompletionToken>
auto
async_write_some(
AsyncWriteStream& stream,
std::queue<request>& requests,
CompletionToken&& token)
{
return net::async_compose<
CompletionToken,
void(boost::system::error_code)>(
write_some_op{stream, requests},
token, stream);
}
bool prepare_next(std::queue<request>& reqs);
} // resp3
} // aedis

View File

@@ -7,8 +7,8 @@
#include <aedis/impl/command.ipp>
#include <aedis/resp3/impl/write.ipp>
#include <aedis/resp3/impl/request.ipp>
#include <aedis/resp3/impl/response.ipp>
#include <aedis/resp3/impl/type.ipp>
#include <aedis/resp3/impl/node.ipp>
#include <aedis/resp3/detail/impl/assemble.ipp>
#include <aedis/resp3/detail/impl/parser.ipp>

View File

@@ -5,5 +5,5 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#define AEDIS_VERSION 4
#define AEDIS_VERSION 5