mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Refactors the serializer class.
This commit is contained in:
@@ -8,6 +8,7 @@
|
||||
#define AEDIS_HPP
|
||||
|
||||
#include <aedis/resp3/read.hpp>
|
||||
#include <aedis/resp3/write.hpp>
|
||||
#include <aedis/adapter/adapt.hpp>
|
||||
#include <aedis/adapter/error.hpp>
|
||||
#include <aedis/redis/command.hpp>
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <algorithm>
|
||||
#include <utility>
|
||||
#include <chrono>
|
||||
#include <queue>
|
||||
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
@@ -22,7 +23,9 @@
|
||||
#include <aedis/resp3/type.hpp>
|
||||
#include <aedis/resp3/node.hpp>
|
||||
#include <aedis/redis/command.hpp>
|
||||
#include <aedis/generic/serializer.hpp>
|
||||
#include <aedis/generic/detail/client_ops.hpp>
|
||||
#include <aedis/generic/serializer.hpp>
|
||||
|
||||
namespace aedis {
|
||||
namespace generic {
|
||||
@@ -96,7 +99,6 @@ public:
|
||||
, cfg_{cfg}
|
||||
, on_write_{[](std::size_t){}}
|
||||
, adapter_{[](Command, resp3::node<boost::string_view> const&, boost::system::error_code&) {}}
|
||||
, sr_{requests_}
|
||||
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
|
||||
, type_{resp3::type::invalid}
|
||||
, cmd_info_{std::make_pair<Command>(Command::invalid, 0)}
|
||||
@@ -118,19 +120,8 @@ public:
|
||||
template <class... Ts>
|
||||
void send(Command cmd, Ts const&... args)
|
||||
{
|
||||
auto const can_write = prepare_next_req();
|
||||
|
||||
auto const before = requests_.size();
|
||||
sr_.push(cmd, args...);
|
||||
auto const d = requests_.size() - before;
|
||||
BOOST_ASSERT(d != 0);
|
||||
info_.back().size += d;;
|
||||
|
||||
if (!has_push_response(cmd)) {
|
||||
commands_.push_back(std::make_pair(cmd, d));
|
||||
++info_.back().cmds;
|
||||
}
|
||||
|
||||
auto const can_write = prepare_back();
|
||||
reqs_.back().first.push(cmd, args...);
|
||||
if (can_write)
|
||||
wait_write_timer_.cancel_one();
|
||||
}
|
||||
@@ -149,19 +140,8 @@ public:
|
||||
if (begin == end)
|
||||
return;
|
||||
|
||||
auto const can_write = prepare_next_req();
|
||||
|
||||
auto const before = requests_.size();
|
||||
sr_.push_range2(cmd, key, begin, end);
|
||||
auto const d = requests_.size() - before;
|
||||
BOOST_ASSERT(d != 0);
|
||||
info_.back().size += d;
|
||||
|
||||
if (!has_push_response(cmd)) {
|
||||
commands_.push_back(std::make_pair(cmd, d));
|
||||
++info_.back().cmds;
|
||||
}
|
||||
|
||||
auto const can_write = prepare_back();
|
||||
reqs_.back().first.push_range2(cmd, key, begin, end);
|
||||
if (can_write)
|
||||
wait_write_timer_.cancel_one();
|
||||
}
|
||||
@@ -180,19 +160,8 @@ public:
|
||||
if (begin == end)
|
||||
return;
|
||||
|
||||
auto const can_write = prepare_next_req();
|
||||
|
||||
auto const before = requests_.size();
|
||||
sr_.push_range2(cmd, begin, end);
|
||||
auto const d = requests_.size() - before;
|
||||
BOOST_ASSERT(d != 0);
|
||||
info_.back().size += d;
|
||||
|
||||
if (!has_push_response(cmd)) {
|
||||
commands_.push_back(std::make_pair(cmd, d));
|
||||
++info_.back().cmds;
|
||||
}
|
||||
|
||||
auto const can_write = prepare_back();
|
||||
reqs_.back().first.push_range2(cmd, begin, end);
|
||||
if (can_write)
|
||||
wait_write_timer_.cancel_one();
|
||||
}
|
||||
@@ -300,7 +269,7 @@ public:
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::run_op<client>{this}, token, read_timer_, write_timer_, wait_write_timer_);
|
||||
>(detail::run_op<client, Command>{this}, token, read_timer_, write_timer_, wait_write_timer_);
|
||||
}
|
||||
|
||||
/** @brief Receives events produces by the run operation.
|
||||
@@ -328,20 +297,20 @@ public:
|
||||
socket_->close();
|
||||
wait_write_timer_.expires_at(std::chrono::steady_clock::now());
|
||||
ch_.cancel();
|
||||
reqs_ = {};
|
||||
}
|
||||
|
||||
private:
|
||||
using command_info_type = std::pair<Command, std::size_t>;
|
||||
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
|
||||
using channel_type = boost::asio::experimental::channel<void(boost::system::error_code, Command, std::size_t)>;
|
||||
|
||||
template <class T, class V> friend struct detail::reader_op;
|
||||
template <class T, class V> friend struct detail::ping_after_op;
|
||||
template <class T, class V> friend struct detail::read_op;
|
||||
template <class T, class V> friend struct detail::run_op;
|
||||
template <class T> friend struct detail::read_until_op;
|
||||
template <class T> friend struct detail::writer_op;
|
||||
template <class T> friend struct detail::write_op;
|
||||
template <class T> friend struct detail::run_op;
|
||||
template <class T> friend struct detail::connect_op;
|
||||
template <class T> friend struct detail::resolve_op;
|
||||
template <class T> friend struct detail::check_idle_op;
|
||||
@@ -349,93 +318,29 @@ private:
|
||||
template <class T> friend struct detail::read_write_check_op;
|
||||
template <class T> friend struct detail::wait_for_data_op;
|
||||
|
||||
void prepare_state()
|
||||
{
|
||||
// When we are reconnecting we can't simply call send(hello)
|
||||
// as that will add the command to the end of the queue, we need
|
||||
// it as the first element.
|
||||
if (info_.empty()) {
|
||||
// Either we are connecting for the first time or there are
|
||||
// no commands that were left unresponded from the last
|
||||
// connection. We can send hello as usual.
|
||||
BOOST_ASSERT(requests_.empty());
|
||||
BOOST_ASSERT(commands_.empty());
|
||||
send(Command::hello, 3);
|
||||
return;
|
||||
}
|
||||
|
||||
if (info_.front().sent) {
|
||||
// There is one request that was left unresponded when we
|
||||
// e.g. lost the connection, since we erase requests right
|
||||
// after writing them to the socket (to avoid resubmission) it
|
||||
// is lost and we have to remove it.
|
||||
|
||||
// Noop if info_.front().size is already zero, which happens
|
||||
// when the request was successfully writen to the socket.
|
||||
// In the future we may want to avoid erasing but resend (at
|
||||
// the risc of resubmission).
|
||||
requests_.erase(0, info_.front().size);
|
||||
|
||||
// Erases the commands that were lost as well.
|
||||
commands_.erase(
|
||||
std::begin(commands_),
|
||||
std::begin(commands_) + info_.front().cmds);
|
||||
|
||||
info_.front().cmds = 0;
|
||||
|
||||
// Do not erase the info_ front as we will use it below.
|
||||
// info_.erase(std::begin(info_));
|
||||
}
|
||||
|
||||
// Code below will add a hello to the front of the request and
|
||||
// update info_ and commands_ accordingly.
|
||||
|
||||
auto const old_size = requests_.size();
|
||||
sr_.push(Command::hello, 3);
|
||||
auto const hello_size = requests_.size() - old_size;;
|
||||
|
||||
// Now we have to rotate the hello to the front of the request
|
||||
// (Remember it must always be the first command).
|
||||
std::rotate(
|
||||
std::begin(requests_),
|
||||
std::begin(requests_) + old_size,
|
||||
std::end(requests_));
|
||||
|
||||
// Updates info_.
|
||||
info_.front().size += hello_size;
|
||||
info_.front().cmds += 1;
|
||||
|
||||
// Updates commands_
|
||||
commands_.push_back(std::make_pair(Command::hello, hello_size));
|
||||
std::rotate(
|
||||
std::begin(commands_),
|
||||
std::prev(std::end(commands_)),
|
||||
std::end(commands_));
|
||||
}
|
||||
|
||||
// Prepares the back of the queue to receive further commands. If
|
||||
// true is returned the request in the front of the queue can be
|
||||
// sent to the server.
|
||||
bool prepare_next_req()
|
||||
bool prepare_back()
|
||||
{
|
||||
if (info_.empty()) {
|
||||
info_.push_back({});
|
||||
if (reqs_.empty()) {
|
||||
reqs_.push_back({});
|
||||
return true;
|
||||
}
|
||||
|
||||
if (info_.front().sent) {
|
||||
if (reqs_.front().second) {
|
||||
// There is a pending response, we can't modify the front of
|
||||
// the vector.
|
||||
BOOST_ASSERT(info_.front().cmds != 0);
|
||||
if (info_.size() == 1)
|
||||
info_.push_back({});
|
||||
BOOST_ASSERT(!reqs_.front().first.commands().empty());
|
||||
if (reqs_.size() == 1)
|
||||
reqs_.push_back({});
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// When cmds = 0 there are only commands with push response on
|
||||
// the request and we are not waiting for any response.
|
||||
return info_.front().cmds == 0;
|
||||
return reqs_.front().first.commands().empty();
|
||||
}
|
||||
|
||||
// Returns true when the next request can be written.
|
||||
@@ -444,17 +349,17 @@ private:
|
||||
if (type_ == resp3::type::push)
|
||||
return false;
|
||||
|
||||
BOOST_ASSERT(!info_.empty());
|
||||
BOOST_ASSERT(!commands_.empty());
|
||||
BOOST_ASSERT(!reqs_.empty());
|
||||
BOOST_ASSERT(!reqs_.front().first.commands().empty());
|
||||
|
||||
commands_.erase(std::begin(commands_));
|
||||
reqs_.front().first.pop();
|
||||
|
||||
if (--info_.front().cmds != 0)
|
||||
if (!reqs_.front().first.commands().empty())
|
||||
return false;
|
||||
|
||||
info_.erase(std::begin(info_));
|
||||
reqs_.pop_front();
|
||||
|
||||
return !info_.empty();
|
||||
return !reqs_.empty();
|
||||
}
|
||||
|
||||
// Resolves the address passed in async_run and store the results
|
||||
@@ -586,20 +491,6 @@ private:
|
||||
>(detail::check_idle_op<client>{this}, token, check_idle_timer_);
|
||||
}
|
||||
|
||||
// Stores information about a request.
|
||||
struct info {
|
||||
// Set to true before calling async_write.
|
||||
bool sent = false;
|
||||
|
||||
// Request size in bytes. After a successful write it is set to
|
||||
// zero.
|
||||
std::size_t size = 0;
|
||||
|
||||
// The number of commands it contains. Commands with push
|
||||
// responses are not counted.
|
||||
std::size_t cmds = 0;
|
||||
};
|
||||
|
||||
// Used to resolve the host on async_resolve.
|
||||
boost::asio::ip::tcp::resolver resv_;
|
||||
|
||||
@@ -635,22 +526,15 @@ private:
|
||||
// Buffer used by the read operations.
|
||||
std::string read_buffer_;
|
||||
|
||||
// Requests payload and its serializer.
|
||||
std::string requests_;
|
||||
serializer<std::string> sr_;
|
||||
|
||||
// The commands contained in the requests.
|
||||
std::vector<command_info_type> commands_;
|
||||
|
||||
// Info about the requests.
|
||||
std::vector<info> info_;
|
||||
std::deque<std::pair<generic::serializer<Command>, bool>> reqs_;
|
||||
|
||||
// Last time we received data.
|
||||
time_point_type last_data_;
|
||||
|
||||
// Used by the read_op.
|
||||
resp3::type type_;
|
||||
command_info_type cmd_info_;
|
||||
typename generic::serializer<Command>::command_info_type cmd_info_;
|
||||
|
||||
// See async_connect.
|
||||
boost::asio::ip::tcp::endpoint endpoint_;
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include <aedis/resp3/detail/parser.hpp>
|
||||
#include <aedis/resp3/read.hpp>
|
||||
#include <aedis/generic/error.hpp>
|
||||
#include <aedis/redis/command.hpp>
|
||||
|
||||
namespace aedis {
|
||||
namespace generic {
|
||||
@@ -331,7 +332,7 @@ struct read_write_check_op {
|
||||
}
|
||||
};
|
||||
|
||||
template <class Client>
|
||||
template <class Client, class Command>
|
||||
struct run_op {
|
||||
Client* cli;
|
||||
boost::asio::coroutine coro;
|
||||
@@ -347,7 +348,7 @@ struct run_op {
|
||||
return;
|
||||
}
|
||||
|
||||
cli->prepare_state();
|
||||
cli->send(Command::hello, 3);
|
||||
|
||||
yield cli->async_read_write_check(std::move(self));
|
||||
if (ec) {
|
||||
@@ -374,15 +375,14 @@ struct write_op {
|
||||
{
|
||||
reenter (coro)
|
||||
{
|
||||
BOOST_ASSERT(!cli->info_.empty());
|
||||
BOOST_ASSERT(cli->info_.front().size != 0);
|
||||
BOOST_ASSERT(!cli->requests_.empty());
|
||||
BOOST_ASSERT(!cli->reqs_.empty());
|
||||
BOOST_ASSERT(!cli->reqs_.front().first.empty());
|
||||
|
||||
cli->write_timer_.expires_after(cli->cfg_.write_timeout);
|
||||
cli->info_.front().sent = true;
|
||||
cli->reqs_.front().second = true;
|
||||
yield
|
||||
boost::asio::experimental::make_parallel_group(
|
||||
[this](auto token) { return boost::asio::async_write(*cli->socket_, boost::asio::buffer(cli->requests_.data(), cli->info_.front().size), token);},
|
||||
[this](auto token) { return boost::asio::async_write(*cli->socket_, boost::asio::buffer(cli->reqs_.front().first.data(), cli->reqs_.front().first.size()), token);},
|
||||
[this](auto token) { return cli->write_timer_.async_wait(token);}
|
||||
).async_wait(
|
||||
boost::asio::experimental::wait_for_one(),
|
||||
@@ -410,16 +410,13 @@ struct write_op {
|
||||
|
||||
cli->bytes_written_ = n;
|
||||
|
||||
BOOST_ASSERT(!cli->info_.empty());
|
||||
BOOST_ASSERT(cli->info_.front().size != 0);
|
||||
BOOST_ASSERT(!cli->requests_.empty());
|
||||
BOOST_ASSERT(n == cli->info_.front().size);
|
||||
|
||||
cli->requests_.erase(0, n);
|
||||
cli->info_.front().size = 0;
|
||||
if (cli->info_.front().cmds == 0)
|
||||
cli->info_.erase(std::begin(cli->info_));
|
||||
BOOST_ASSERT(!cli->reqs_.empty());
|
||||
BOOST_ASSERT(!cli->reqs_.front().first.empty());
|
||||
BOOST_ASSERT(!cli->reqs_.empty());
|
||||
BOOST_ASSERT(n == cli->reqs_.front().first.size());
|
||||
|
||||
if (cli->reqs_.front().first.commands().empty())
|
||||
cli->reqs_.pop_front();
|
||||
self.complete({});
|
||||
}
|
||||
}
|
||||
@@ -530,8 +527,8 @@ struct reader_op {
|
||||
cli->type_ = resp3::to_type(cli->read_buffer_.front());
|
||||
cli->cmd_info_ = std::make_pair(Command::invalid, 0);
|
||||
if (cli->type_ != resp3::type::push) {
|
||||
BOOST_ASSERT(!cli->commands_.empty());
|
||||
cli->cmd_info_ = cli->commands_.front();
|
||||
BOOST_ASSERT(!cli->reqs_.front().first.commands().empty());
|
||||
cli->cmd_info_ = cli->reqs_.front().first.commands().front();
|
||||
}
|
||||
|
||||
cli->last_data_ = std::chrono::steady_clock::now();
|
||||
|
||||
@@ -29,14 +29,13 @@ namespace generic {
|
||||
* Example
|
||||
*
|
||||
* @code
|
||||
* std::string request;
|
||||
* auto sr = make_serializer(request);
|
||||
* sr.push(command::hello, 3);
|
||||
* sr.push(command::flushall);
|
||||
* sr.push(command::ping);
|
||||
* sr.push(command::incr, "key");
|
||||
* sr.push(command::quit);
|
||||
* co_await async_write(socket, buffer(request));
|
||||
* request r;
|
||||
* r.push(command::hello, 3);
|
||||
* r.push(command::flushall);
|
||||
* r.push(command::ping);
|
||||
* r.push(command::incr, "key");
|
||||
* r.push(command::quit);
|
||||
* co_await async_write(socket, buffer(r));
|
||||
* @endcode
|
||||
*
|
||||
* \tparam Storage The storage type e.g \c std::string.
|
||||
@@ -44,18 +43,32 @@ namespace generic {
|
||||
* \remarks Non-string types will be converted to string by using \c
|
||||
* to_bulk, which must be made available over ADL.
|
||||
*/
|
||||
template <class Storage>
|
||||
template <class Command>
|
||||
class serializer {
|
||||
public:
|
||||
using command_info_type = std::pair<Command, std::size_t>;
|
||||
|
||||
private:
|
||||
Storage* request_;
|
||||
|
||||
std::string payload_;
|
||||
std::vector<command_info_type> commands_;
|
||||
|
||||
public:
|
||||
/** \brief Constructor
|
||||
*
|
||||
* \param storage The underlying storage object i.e. where the
|
||||
* request is to be stored.
|
||||
*/
|
||||
serializer(Storage& storage) : request_(&storage) {}
|
||||
auto const& commands() const { return commands_;};
|
||||
auto size() const { return payload_.size();}
|
||||
auto empty() const { return payload_.empty(); }
|
||||
auto const* data() const { return payload_.data(); }
|
||||
auto const& payload() const { return payload_;}
|
||||
|
||||
void pop()
|
||||
{
|
||||
BOOST_ASSERT(!commands_.empty());
|
||||
commands_.erase(std::begin(commands_));
|
||||
|
||||
// TODO: Erase the payload, perhaps by adding an offset of
|
||||
// already acknolodged commands.
|
||||
// TODO: Add function to enable laze pop.
|
||||
}
|
||||
|
||||
/** @brief Appends a new command to the end of the request.
|
||||
*
|
||||
@@ -73,18 +86,22 @@ public:
|
||||
* \param cmd The command e.g redis or sentinel command.
|
||||
* \param args Command arguments.
|
||||
*/
|
||||
template <class Command, class... Ts>
|
||||
template <class... Ts>
|
||||
void push(Command cmd, Ts const&... args)
|
||||
{
|
||||
using boost::hana::for_each;
|
||||
using boost::hana::make_tuple;
|
||||
using resp3::type;
|
||||
|
||||
auto const before = payload_.size();
|
||||
auto constexpr pack_size = sizeof...(Ts);
|
||||
resp3::add_header(*request_, type::array, 1 + pack_size);
|
||||
resp3::add_header(payload_, type::array, 1 + pack_size);
|
||||
resp3::add_bulk(payload_, to_string(cmd));
|
||||
resp3::add_bulk(payload_, make_tuple(args...));
|
||||
|
||||
resp3::add_bulk(*request_, to_string(cmd));
|
||||
resp3::add_bulk(*request_, make_tuple(args...));
|
||||
auto const after = payload_.size();
|
||||
if (!has_push_response(cmd))
|
||||
commands_.push_back(std::make_pair(cmd, after - before));
|
||||
}
|
||||
|
||||
/** @brief Appends a new command to the end of the request.
|
||||
@@ -108,7 +125,7 @@ public:
|
||||
* \param begin Iterator to the begin of the range.
|
||||
* \param end Iterator to the end of the range.
|
||||
*/
|
||||
template <class Command, class Key, class ForwardIterator>
|
||||
template <class Key, class ForwardIterator>
|
||||
void push_range2(Command cmd, Key const& key, ForwardIterator begin, ForwardIterator end)
|
||||
{
|
||||
using value_type = typename std::iterator_traits<ForwardIterator>::value_type;
|
||||
@@ -117,14 +134,20 @@ public:
|
||||
if (begin == end)
|
||||
return;
|
||||
|
||||
auto const before = payload_.size();
|
||||
|
||||
auto constexpr size = resp3::bulk_counter<value_type>::size;
|
||||
auto const distance = std::distance(begin, end);
|
||||
resp3::add_header(*request_, type::array, 2 + size * distance);
|
||||
resp3::add_bulk(*request_, to_string(cmd));
|
||||
resp3::add_bulk(*request_, key);
|
||||
resp3::add_header(payload_, type::array, 2 + size * distance);
|
||||
resp3::add_bulk(payload_, to_string(cmd));
|
||||
resp3::add_bulk(payload_, key);
|
||||
|
||||
for (; begin != end; ++begin)
|
||||
resp3::add_bulk(*request_, *begin);
|
||||
resp3::add_bulk(payload_, *begin);
|
||||
|
||||
auto const after = payload_.size();
|
||||
if (!has_push_response(cmd))
|
||||
commands_.push_back(std::make_pair(cmd, after - before));
|
||||
}
|
||||
|
||||
/** @brief Appends a new command to the end of the request.
|
||||
@@ -144,7 +167,7 @@ public:
|
||||
* \param begin Iterator to the begin of the range.
|
||||
* \param end Iterator to the end of the range.
|
||||
*/
|
||||
template <class Command, class ForwardIterator>
|
||||
template <class ForwardIterator>
|
||||
void push_range2(Command cmd, ForwardIterator begin, ForwardIterator end)
|
||||
{
|
||||
using value_type = typename std::iterator_traits<ForwardIterator>::value_type;
|
||||
@@ -153,20 +176,25 @@ public:
|
||||
if (begin == end)
|
||||
return;
|
||||
|
||||
auto const before = payload_.size();
|
||||
auto constexpr size = resp3::bulk_counter<value_type>::size;
|
||||
auto const distance = std::distance(begin, end);
|
||||
resp3::add_header(*request_, type::array, 1 + size * distance);
|
||||
resp3::add_bulk(*request_, to_string(cmd));
|
||||
resp3::add_header(payload_, type::array, 1 + size * distance);
|
||||
resp3::add_bulk(payload_, to_string(cmd));
|
||||
|
||||
for (; begin != end; ++begin)
|
||||
resp3::add_bulk(*request_, *begin);
|
||||
resp3::add_bulk(payload_, *begin);
|
||||
|
||||
auto const after = payload_.size();
|
||||
if (!has_push_response(cmd))
|
||||
commands_.push_back(std::make_pair(cmd, before - after));
|
||||
}
|
||||
|
||||
/** @brief Appends a new command to the end of the request.
|
||||
*
|
||||
* Equivalent to the overload taking a range (i.e. send_range2).
|
||||
*/
|
||||
template <class Command, class Key, class Range>
|
||||
template <class Key, class Range>
|
||||
void push_range(Command cmd, Key const& key, Range const& range)
|
||||
{
|
||||
using std::begin;
|
||||
@@ -178,7 +206,7 @@ public:
|
||||
*
|
||||
* Equivalent to the overload taking a range (i.e. send_range2).
|
||||
*/
|
||||
template <class Command, class Range>
|
||||
template <class Range>
|
||||
void push_range(Command cmd, Range const& range)
|
||||
{
|
||||
using std::begin;
|
||||
@@ -187,16 +215,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/** \brief Creates a serializer.
|
||||
* \ingroup any
|
||||
* \param storage The string.
|
||||
*/
|
||||
template <class CharT, class Traits, class Allocator>
|
||||
auto make_serializer(std::basic_string<CharT, Traits, Allocator>& storage)
|
||||
{
|
||||
return serializer<std::basic_string<CharT, Traits, Allocator>>(storage);
|
||||
}
|
||||
|
||||
} // generic
|
||||
} // aedis
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include <boost/asio/read.hpp>
|
||||
#include <boost/asio/compose.hpp>
|
||||
#include <boost/asio/async_result.hpp>
|
||||
#include <boost/asio/yield.hpp>
|
||||
|
||||
namespace aedis {
|
||||
namespace resp3 {
|
||||
@@ -189,6 +188,4 @@ auto async_read(
|
||||
} // resp3
|
||||
} // aedis
|
||||
|
||||
#include <boost/asio/unyield.hpp>
|
||||
|
||||
#endif // AEDIS_RESP3_READ_HPP
|
||||
|
||||
59
aedis/resp3/write.hpp
Normal file
59
aedis/resp3/write.hpp
Normal file
@@ -0,0 +1,59 @@
|
||||
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
|
||||
*
|
||||
* Distributed under the Boost Software License, Version 1.0. (See
|
||||
* accompanying file LICENSE.txt)
|
||||
*/
|
||||
|
||||
#ifndef AEDIS_RESP3_WRITE_HPP
|
||||
#define AEDIS_RESP3_WRITE_HPP
|
||||
|
||||
#include <boost/asio/write.hpp>
|
||||
|
||||
namespace aedis {
|
||||
namespace resp3 {
|
||||
|
||||
/** @brief TODO
|
||||
*/
|
||||
template<
|
||||
class SyncWriteStream,
|
||||
class Request
|
||||
>
|
||||
std::size_t write(SyncWriteStream& stream, Request const& req)
|
||||
{
|
||||
return boost::asio::write(stream, boost::asio::buffer(req.payload()));
|
||||
}
|
||||
|
||||
/** @brief TODO
|
||||
*/
|
||||
template<
|
||||
class SyncWriteStream,
|
||||
class Request
|
||||
>
|
||||
std::size_t write(
|
||||
SyncWriteStream& stream,
|
||||
Request const& req,
|
||||
boost::system::error_code& ec)
|
||||
{
|
||||
return boost::asio::write(stream, boost::asio::buffer(req.payload()), ec);
|
||||
}
|
||||
|
||||
/** @brief Writes a request asynchronously.
|
||||
*/
|
||||
template<
|
||||
class AsyncWriteStream,
|
||||
class Request,
|
||||
class CompletionToken = boost::asio::default_completion_token_t<typename AsyncWriteStream::executor_type>
|
||||
>
|
||||
auto async_write(
|
||||
AsyncWriteStream& stream,
|
||||
Request const& req,
|
||||
CompletionToken&& token =
|
||||
boost::asio::default_completion_token_t<typename AsyncWriteStream::executor_type>{})
|
||||
{
|
||||
return boost::asio::async_write(stream, boost::asio::buffer(req.payload()));
|
||||
}
|
||||
|
||||
} // resp3
|
||||
} // aedis
|
||||
|
||||
#endif // AEDIS_RESP3_WRITE_HPP
|
||||
@@ -18,7 +18,7 @@ namespace resp3 = aedis::resp3;
|
||||
|
||||
using aedis::resp3::node;
|
||||
using aedis::redis::command;
|
||||
using aedis::generic::make_serializer;
|
||||
using aedis::generic::serializer;
|
||||
using net::ip::tcp;
|
||||
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
|
||||
|
||||
@@ -31,19 +31,18 @@ net::awaitable<void> example()
|
||||
tcp_socket socket{ex};
|
||||
co_await socket.async_connect(*std::begin(res));
|
||||
|
||||
std::string request, buffer;
|
||||
|
||||
auto sr = make_serializer(request);
|
||||
sr.push(command::hello, 3);
|
||||
sr.push(command::ping, "Some message.");
|
||||
sr.push(command::quit);
|
||||
co_await net::async_write(socket, net::buffer(request));
|
||||
serializer<command> req;
|
||||
req.push(command::hello, 3);
|
||||
req.push(command::ping, "Some message.");
|
||||
req.push(command::quit);
|
||||
co_await resp3::async_write(socket, req);
|
||||
|
||||
auto adapter = [](node<boost::string_view> const& nd, boost::system::error_code&)
|
||||
{
|
||||
std::cout << nd << std::endl;
|
||||
};
|
||||
|
||||
std::string buffer;
|
||||
auto dbuffer = net::dynamic_buffer(buffer);
|
||||
co_await resp3::async_read(socket, dbuffer); // hello
|
||||
co_await resp3::async_read(socket, dbuffer, adapter);
|
||||
|
||||
@@ -16,9 +16,8 @@ namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
|
||||
using aedis::redis::command;
|
||||
using aedis::generic::make_serializer;
|
||||
using aedis::generic::serializer;
|
||||
using aedis::adapter::adapt;
|
||||
using net::dynamic_buffer;
|
||||
using net::ip::tcp;
|
||||
|
||||
int main()
|
||||
@@ -31,19 +30,17 @@ int main()
|
||||
net::connect(socket, res);
|
||||
|
||||
// Creates the request and writes to the socket.
|
||||
std::string buffer;
|
||||
auto sr = make_serializer(buffer);
|
||||
sr.push(command::hello, 3);
|
||||
sr.push(command::ping);
|
||||
sr.push(command::quit);
|
||||
net::write(socket, net::buffer(buffer));
|
||||
buffer.clear();
|
||||
serializer<command> req;
|
||||
req.push(command::hello, 3);
|
||||
req.push(command::ping);
|
||||
req.push(command::quit);
|
||||
resp3::write(socket, req);
|
||||
|
||||
// Responses
|
||||
std::string resp;
|
||||
std::string buffer, resp;
|
||||
|
||||
// Reads the responses to all commands in the request.
|
||||
auto dbuffer = dynamic_buffer(buffer);
|
||||
auto dbuffer = net::dynamic_buffer(buffer);
|
||||
resp3::read(socket, dbuffer);
|
||||
resp3::read(socket, dbuffer, adapt(resp));
|
||||
resp3::read(socket, dbuffer);
|
||||
|
||||
@@ -22,7 +22,7 @@ namespace resp3 = aedis::resp3;
|
||||
|
||||
using aedis::resp3::type;
|
||||
using aedis::redis::command;
|
||||
using aedis::generic::make_serializer;
|
||||
using aedis::generic::serializer;
|
||||
using aedis::adapter::adapt;
|
||||
using net::ip::tcp;
|
||||
|
||||
@@ -39,13 +39,12 @@ int main()
|
||||
mystruct in{42, "Some string"};
|
||||
|
||||
// Creates and sends a request to redis.
|
||||
std::string request;
|
||||
auto sr = make_serializer(request);
|
||||
sr.push(command::hello, 3);
|
||||
sr.push(command::set, "key", in);
|
||||
sr.push(command::get, "key");
|
||||
sr.push(command::quit);
|
||||
net::write(socket, net::buffer(request));
|
||||
serializer<command> req;
|
||||
req.push(command::hello, 3);
|
||||
req.push(command::set, "key", in);
|
||||
req.push(command::get, "key");
|
||||
req.push(command::quit);
|
||||
resp3::write(socket, req);
|
||||
|
||||
// Object to store the response.
|
||||
mystruct out;
|
||||
|
||||
@@ -19,7 +19,7 @@ namespace resp3 = aedis::resp3;
|
||||
using aedis::resp3::node;
|
||||
using aedis::redis::command;
|
||||
using aedis::adapter::adapt;
|
||||
using aedis::generic::make_serializer;
|
||||
using aedis::generic::serializer;
|
||||
using net::ip::tcp;
|
||||
using net::write;
|
||||
using net::buffer;
|
||||
@@ -34,11 +34,10 @@ net::awaitable<void> example()
|
||||
tcp_socket socket{ex};
|
||||
co_await socket.async_connect(*std::begin(res));
|
||||
|
||||
std::string request;
|
||||
auto sr = make_serializer(request);
|
||||
sr.push(command::hello, 3);
|
||||
sr.push(command::subscribe, "channel1", "channel2");
|
||||
co_await net::async_write(socket, buffer(request));
|
||||
serializer<command> req;
|
||||
req.push(command::hello, 3);
|
||||
req.push(command::subscribe, "channel1", "channel2");
|
||||
co_await resp3::async_write(socket, req);
|
||||
|
||||
// Ignores the response to hello.
|
||||
std::string buffer;
|
||||
|
||||
@@ -17,7 +17,7 @@ namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
using aedis::redis::command;
|
||||
using aedis::adapter::adapt;
|
||||
using aedis::generic::make_serializer;
|
||||
using aedis::generic::serializer;
|
||||
using net::ip::tcp;
|
||||
using tcp_socket = net::use_awaitable_t<>::as_default_on_t<net::ip::tcp::socket>;
|
||||
|
||||
@@ -30,15 +30,14 @@ net::awaitable<void> example()
|
||||
tcp_socket socket{ex};
|
||||
co_await socket.async_connect(*std::begin(res));
|
||||
|
||||
std::string request;
|
||||
auto sr = make_serializer(request);
|
||||
sr.push(command::hello, 3);
|
||||
sr.push(command::multi);
|
||||
sr.push(command::ping, "Some message.");
|
||||
sr.push(command::set, "low-level-key", "some content", "EX", "2");
|
||||
sr.push(command::exec);
|
||||
sr.push(command::quit);
|
||||
co_await net::async_write(socket, net::buffer(request));
|
||||
serializer<command> req;
|
||||
req.push(command::hello, 3);
|
||||
req.push(command::multi);
|
||||
req.push(command::ping, "Some message.");
|
||||
req.push(command::set, "low-level-key", "some content", "EX", "2");
|
||||
req.push(command::exec);
|
||||
req.push(command::quit);
|
||||
co_await resp3::async_write(socket, req);
|
||||
|
||||
std::tuple<std::string, boost::optional<std::string>> response;
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ namespace resp3 = aedis::resp3;
|
||||
|
||||
using aedis::adapter::adapt;
|
||||
using aedis::adapter::adapter_t;
|
||||
using aedis::generic::make_serializer;
|
||||
using aedis::redis::command;
|
||||
using aedis::resp3::node;
|
||||
using client_type = aedis::generic::client<net::ip::tcp::socket, command>;
|
||||
@@ -265,13 +264,13 @@ net::awaitable<void> reader5(std::shared_ptr<client_type> db)
|
||||
{
|
||||
auto [ec, cmd, n] = co_await db->async_read_one(as_tuple(net::use_awaitable));
|
||||
expect_error(ec, error_code{});
|
||||
expect_eq(cmd, command::hello);
|
||||
expect_eq(cmd, command::hello, "Expects hello.");
|
||||
}
|
||||
|
||||
{
|
||||
auto [ec, cmd, n] = co_await db->async_read_one(as_tuple(net::use_awaitable));
|
||||
expect_error(ec, error_code{});
|
||||
expect_eq(cmd, command::quit);
|
||||
expect_eq(cmd, command::quit, "Expects quit.");
|
||||
}
|
||||
|
||||
{
|
||||
@@ -301,8 +300,7 @@ void test_reconnect()
|
||||
|
||||
auto on_write = [i = 0, db](std::size_t) mutable
|
||||
{
|
||||
if (i++ < 3)
|
||||
db->send(command::quit);
|
||||
db->send(command::quit);
|
||||
};
|
||||
|
||||
db->set_write_handler(on_write);
|
||||
|
||||
@@ -17,7 +17,7 @@ namespace net = boost::asio;
|
||||
namespace resp3 = aedis::resp3;
|
||||
|
||||
using aedis::redis::command;
|
||||
using aedis::generic::make_serializer;
|
||||
using aedis::generic::serializer;
|
||||
using aedis::resp3::node;
|
||||
using aedis::adapter::adapt;
|
||||
using net::ip::tcp;
|
||||
@@ -81,12 +81,11 @@ int main()
|
||||
tcp::socket socket{ioc};
|
||||
net::connect(socket, res);
|
||||
|
||||
std::string request;
|
||||
auto sr = make_serializer(request);
|
||||
sr.push(command::hello, 3);
|
||||
sr.push(command::command);
|
||||
sr.push(command::quit);
|
||||
write(socket, buffer(request));
|
||||
serializer<command> req;
|
||||
req.push(command::hello, 3);
|
||||
req.push(command::command);
|
||||
req.push(command::quit);
|
||||
write(socket, buffer(req.payload()));
|
||||
|
||||
std::vector<node<std::string>> resp;
|
||||
|
||||
@@ -104,4 +103,3 @@ int main()
|
||||
std::cerr << e.what() << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user