2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Fixes some examples.

This commit is contained in:
Marcelo Zimbres
2022-02-20 19:43:01 +01:00
parent ee71ff885d
commit feb383d7e2
8 changed files with 117 additions and 135 deletions

View File

@@ -76,7 +76,7 @@ nobase_include_HEADERS =\
$(top_srcdir)/aedis/resp3/detail/impl/parser.ipp\
$(top_srcdir)/aedis/resp3/impl/type.ipp\
$(top_srcdir)/aedis/resp3/impl/node.ipp\
$(top_srcdir)/aedis/redis/experimental/client.hpp
$(top_srcdir)/aedis/redis/client.hpp
nobase_noinst_HEADERS =\
$(top_srcdir)/examples/lib/net_utils.hpp\

View File

@@ -16,7 +16,7 @@
#include <aedis/resp3/error.hpp>
#include <aedis/resp3/serializer.hpp>
#include <aedis/resp3/response_traits.hpp>
#include <aedis/redis/experimental/client.hpp>
#include <aedis/redis/client.hpp>
/** \mainpage Documentation
\tableofcontents

View File

@@ -23,7 +23,6 @@
namespace aedis {
namespace redis {
namespace experimental {
inline
auto adapt()
@@ -207,12 +206,11 @@ struct writer_op {
self.complete(ec);
return;
}
}
}
};
// TODO: is it correct to call stop_writer?
// TODO: Is it correct to call stop_writer?
template <class Client>
struct read_op {
Client* cli;
@@ -246,6 +244,7 @@ struct read_op {
assert(!std::empty(cli->read_buffer_));
t = resp3::detail::to_type(cli->read_buffer_.front());
cmd = redis::command::unknown;
if (t != resp3::type::push) {
assert(!std::empty(cli->commands_));
cmd = cli->commands_.front();
@@ -376,6 +375,12 @@ private:
return false;
}
void stop_writer()
{
stop_writer_ = true;
timer_.cancel();
}
// Returns true when the next request can be writen.
bool on_read()
{
@@ -392,6 +397,59 @@ private:
return !std::empty(req_info_);
}
// Reads messages asynchronously.
template <class CompletionToken = default_completion_token_type>
auto async_reader(CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(read_op<client>{this}, token, socket_);
}
template <class CompletionToken = default_completion_token_type>
auto async_writer(CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(writer_op<client>{this}, token, socket_, timer_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_resolve(
std::string const& host = "localhost",
std::string const& service = "6379",
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(resolve_op<client>{this, host, service}, token, resolver_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_connect(
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(connect_op<client>{this}, token, socket_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_read_write(
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(read_write_op<client>{this}, token, socket_, timer_, resolver_);
}
public:
/** \brief Client constructor.
*
@@ -407,19 +465,9 @@ public:
timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
stream_type& next_layer() {return socket_;}
stream_type const& next_layer() const {return socket_;}
/// Returns the executor used for I/O with Redis.
auto get_executor() {return socket_.get_executor();}
void stop_writer()
{
stop_writer_ = true;
timer_.cancel();
}
void set_writter_callback(writer_callback_type wcallback)
{ wcallback_ = std::move(wcallback); }
@@ -478,70 +526,6 @@ public:
timer_.cancel_one();
}
// Reads messages asynchronously.
template <class CompletionToken = default_completion_token_type>
auto async_reader(CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(read_op<client>{this}, token, socket_);
}
template <class CompletionToken = default_completion_token_type>
auto async_writer(CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(writer_op<client>{this}, token, socket_, timer_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_resolve(
std::string const& host = "localhost",
std::string const& service = "6379",
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(resolve_op<client>{this, host, service}, token, resolver_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_connect(
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(connect_op<client>{this}, token, socket_);
}
net::awaitable<void>
async_connect(
std::string const& host = "localhost",
std::string const& service = "6379")
{
auto ex = co_await net::this_coro::executor;
co_await async_resolve(host, service, net::use_awaitable);
co_await async_connect(net::use_awaitable);
}
template <class CompletionToken = default_completion_token_type>
auto
async_read_write(
CompletionToken&& token = default_completion_token_type{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(read_write_op<client>{this}, token, socket_, timer_, resolver_);
}
template <class CompletionToken = default_completion_token_type>
auto
async_run(
@@ -556,6 +540,5 @@ public:
}
};
} // experimental
} // redis
} // aedis

View File

@@ -32,7 +32,7 @@ struct node {
std::size_t depth;
/// The actual data. For aggregate data types this is always empty.
std::string data;
std::string data; // TODO: rename to value.
};
/** \brief Converts the node to a string.

View File

@@ -15,21 +15,18 @@
#include "lib/net_utils.hpp"
namespace net = aedis::net;
namespace redis = aedis::redis;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::redis::experimental::adapt;
using aedis::redis::client;
using aedis::redis::adapt;
using aedis::resp3::node;
using aedis::resp3::type;
using aedis::user_session;
using aedis::user_session_base;
// From lib/net_utils.hpp
using aedis::connect;
using aedis::reader;
using aedis::signal_handler;
using aedis::connection_manager;
using socket_type = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::socket>;
using client_type = client<socket_type>;
using client_type = client<aedis::net::ip::tcp::socket>;
// TODO: Delete sessions that have expired.
class receiver : public std::enable_shared_from_this<receiver> {
@@ -46,17 +43,17 @@ public:
{
switch (cmd) {
case command::hello:
db_->send(command::subscribe, "channel");
break;
db_->send(command::subscribe, "channel");
break;
case command::incr:
std::cout << "Message so far: " << resps_.front().data << std::endl;
break;
std::cout << "Message so far: " << resps_.front().data << std::endl;
break;
case command::unknown: // Server push
for (auto& session: sessions_)
session->deliver(resps_.at(3).data);
break;
for (auto& session: sessions_)
session->deliver(resps_.at(3).data);
break;
default: { /* Ignore */ }
}
@@ -64,8 +61,8 @@ public:
resps_.clear();
}
auto get_adapter()
{ return aedis::redis::experimental::adapt(resps_); }
auto adapter()
{ return redis::adapt(resps_); }
auto add(std::shared_ptr<user_session_base> session)
{ sessions_.push_back(session); }
@@ -79,9 +76,15 @@ net::awaitable<void> listener()
auto acc = std::make_shared<net::ip::tcp::acceptor>(ex, endpoint);
auto db = std::make_shared<client_type>(ex);
auto recv = std::make_shared<receiver>(db);
db->set_response_adapter(recv->adapter());
db->set_reader_callback([recv](command cmd) {recv->on_message(cmd);});
auto on_run = [](auto const ec)
{ std::clog << "Lost connection to redis: " << ec.message() << std::endl;};
db->async_run("localhost", "6379", on_run);
net::co_spawn(ex, signal_handler(acc, db), net::detached);
net::co_spawn(ex, connection_manager(db, reader(db, recv)), net::detached);
auto on_user_msg = [db](std::string const& msg)
{

View File

@@ -16,20 +16,16 @@
#include "lib/net_utils.hpp"
namespace net = aedis::net;
namespace redis = aedis::redis;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::redis::experimental::adapt;
using aedis::redis::client;
using aedis::resp3::node;
using aedis::user_session;
using aedis::user_session_base;
// From lib/net_utils.hpp
using aedis::connect;
using aedis::signal_handler;
using aedis::reader;
using aedis::connection_manager;
using socket_type = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::socket>;
using client_type = client<socket_type>;
using client_type = client<aedis::net::ip::tcp::socket>;
class receiver : public std::enable_shared_from_this<receiver> {
private:
@@ -56,8 +52,8 @@ public:
resps_.clear();
}
auto get_adapter()
{ return aedis::redis::experimental::adapt(resps_); }
auto adapter()
{ return redis::adapt(resps_); }
void add_user_session(std::shared_ptr<user_session_base> session)
{ sessions_.push(session); }
@@ -70,10 +66,17 @@ net::awaitable<void> listener()
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<net::ip::tcp::acceptor>(ex, endpoint);
auto db = std::make_shared<client_type>(ex);
auto recv = std::make_shared<receiver>();
db->set_response_adapter(recv->adapter());
db->set_reader_callback([recv](command cmd) {recv->on_message(cmd);});
auto on_run = [](auto const ec)
{ std::clog << "Lost connection to redis: " << ec.message() << std::endl;};
db->async_run("localhost", "6379", on_run);
net::co_spawn(ex, signal_handler(acc, db), net::detached);
net::co_spawn(ex, connection_manager(db, reader(db, recv)), net::detached);
for (;;) {
auto socket = co_await acc->async_accept(net::use_awaitable);
@@ -93,7 +96,7 @@ net::awaitable<void> listener()
int main()
{
try {
net::io_context ioc{1};
net::io_context ioc;
co_spawn(ioc, listener(), net::detached);
ioc.run();
} catch (std::exception const& e) {

View File

@@ -12,15 +12,15 @@
#include <aedis/src.hpp>
namespace net = aedis::net;
namespace redis = aedis::redis::experimental;
namespace redis = aedis::redis;
using aedis::redis::command;
using aedis::redis::experimental::client;
using aedis::redis::client;
using aedis::resp3::node;
using client_type = client<aedis::net::ip::tcp::socket>;
class receiver {
private:
std::vector<node> resps_;
node resps_;
std::shared_ptr<client_type> db_;
public:
@@ -34,11 +34,23 @@ public:
db_->send(command::incr, "redis-client-counter");
db_->send(command::quit);
break;
case command::ping:
std::cout << "Ping message: " << resps_.data << std::endl;
break;
case command::incr:
std::cout << "Ping counter: " << resps_.data << std::endl;
break;
case command::quit:
std::cout << command::quit << ": " << resps_.data << std::endl;
break;
default:;
}
std::cout << cmd << " " << resps_.at(0).data << std::endl;
resps_.clear();
resps_.data.clear();
}
auto adapter() { return redis::adapt(resps_); }

View File

@@ -28,30 +28,11 @@ connect(
co_return std::move(socket);
}
template <class Socket, class Receiver>
net::awaitable<void> reader(
std::shared_ptr<redis::experimental::client<Socket>> db,
std::shared_ptr<Receiver> recv)
{
db->send(redis::command::hello, 3);
for (auto adapter = recv->get_adapter();;) {
boost::system::error_code ec;
auto const cmd = co_await db->async_read(adapter, net::redirect_error(net::use_awaitable, ec));
if (ec) {
db->stop_writer();
co_return;
}
recv->on_message(cmd);
}
}
template <class Socket>
aedis::net::awaitable<void>
signal_handler(
std::shared_ptr<aedis::net::ip::tcp::acceptor> acc,
std::shared_ptr<aedis::redis::experimental::client<Socket>> db)
std::shared_ptr<aedis::redis::client<Socket>> db)
{
auto ex = co_await aedis::net::this_coro::executor;
@@ -70,7 +51,7 @@ signal_handler(
template <class T, class Socket>
net::awaitable<void>
connection_manager(
std::shared_ptr<redis::experimental::client<Socket>> db,
std::shared_ptr<redis::client<Socket>> db,
net::awaitable<T> reader)
{
using namespace net::experimental::awaitable_operators;