2
0
mirror of https://github.com/boostorg/redis.git synced 2026-02-02 21:12:16 +00:00

Some simplifications and a bugfix.

This commit is contained in:
Marcelo Zimbres
2021-10-17 12:25:00 +02:00
parent cd4b57f94f
commit dcd026fb0b
7 changed files with 54 additions and 51 deletions

View File

@@ -6,6 +6,7 @@
*/
#include <iostream>
#include <chrono>
#include <aedis/aedis.hpp>
@@ -13,8 +14,28 @@
using namespace aedis;
net::awaitable<void> example()
net::awaitable<void>
publisher(tcp_socket& socket, std::queue<resp3::request>& requests)
{
auto ex = net::this_coro::executor;
//timer st(ex);
for (auto i = 0; i < 4; ++i) {
if (!socket.is_open())
co_return;
prepare_next(requests);
requests.back().publish("channel1", "Message to channel1");
requests.back().publish("channel2", "Message to channel2");
//st.expires_after(std::chrono::seconds{1});
//co_await st.async_wait();
co_await resp3::detail::async_write_some(socket, requests);
}
}
net::awaitable<void> subscriber()
{
auto ex = net::this_coro::executor;
auto socket = co_await make_connection();
std::queue<resp3::request> requests;
@@ -25,30 +46,15 @@ net::awaitable<void> example()
for (;;) {
resp3::response resp;
co_await cs.async_consume(socket, requests, resp);
std::cout << resp << std::endl;
if (resp.get_type() == resp3::type::push)
if (resp.get_type() == resp3::type::push) {
std::cout << "Received a server push\n" << resp << std::endl;
continue;
}
auto const& elem = requests.front().elements.front();
std::cout << elem << std::endl;
switch (elem.cmd) {
case command::hello:
{
prepare_next(requests);
requests.back().ping();
requests.back().subscribe("some-channel");
} break;
case command::publish: break;
case command::quit: break;
case command::ping:
{
prepare_next(requests);
requests.back().publish("some-channel", "Some message");
requests.back().quit();
} break;
default: { }
if (requests.front().elements.front().cmd == command::hello) {
prepare_next(requests);
requests.back().subscribe({"channel1", "channel2"});
}
}
}
@@ -56,6 +62,6 @@ net::awaitable<void> example()
int main()
{
net::io_context ioc;
co_spawn(ioc, example(), net::detached);
co_spawn(ioc, subscriber(), net::detached);
ioc.run();
}

View File

@@ -11,3 +11,4 @@
using tcp_socket = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::socket>;
using tcp_resolver = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::ip::tcp::resolver>;
using timer = aedis::net::use_awaitable_t<>::as_default_on_t<aedis::net::steady_timer>;

View File

@@ -206,10 +206,11 @@ struct consumer_op {
yield self.complete(ec, m_type);
if (m_type != type::push)
requests.front().elements.pop();
requests.front().elements.pop();
} while (!std::empty(requests.front().elements));
requests.pop();
} while (!std::empty(requests) && !std::empty(requests.front().elements));
if (!std::empty(requests))
requests.pop();
} while (std::empty(requests));
}
}

View File

@@ -76,10 +76,8 @@ struct write_some_op {
if (ec)
break;
requests.front().sent = true;
if (std::empty(requests.front().elements)) {
// We only pop when all commands in the pipeline has push
// We only pop when all commands in the pipeline have push
// responses like subscribe, otherwise, pop is done when the
// response arrives.
requests.pop();
@@ -93,12 +91,15 @@ struct write_some_op {
template<
class AsyncWriteStream,
class CompletionToken>
class CompletionToken =
net::default_completion_token_t<typename AsyncWriteStream::executor_type>
>
auto
async_write_some(
AsyncWriteStream& stream,
std::queue<request>& requests,
CompletionToken&& token)
CompletionToken&& token =
net::default_completion_token_t<typename AsyncWriteStream::executor_type>{})
{
return net::async_compose<
CompletionToken,

View File

@@ -12,17 +12,11 @@ namespace resp3 {
bool prepare_next(std::queue<request>& reqs)
{
if (std::empty(reqs)) {
auto const cond = std::empty(reqs) || std::size(reqs) == 1;
if (cond)
reqs.push({});
return true;
}
if (reqs.back().sent) {
reqs.push({});
return false;
}
return false;
return cond;
}
std::ostream& operator<<(std::ostream& os, request::element const& e)

View File

@@ -37,7 +37,6 @@ public:
};
std::string payload;
std::queue<element> elements;
bool sent = false;
public:
/// Return the size of the pipeline. i.e. how many commands it
@@ -138,13 +137,20 @@ public:
elements.emplace(command::lpop, std::string{key});
}
/// Adds ping to the request, see https://redis.io/commands/subscribe
void subscribe(std::string_view key)
/// Adds subscribe to the request, see https://redis.io/commands/subscribe
void subscribe(std::initializer_list<std::string_view> l)
{
// The response to this command is a push type.
detail::assemble(payload, "SUBSCRIBE", key);
std::initializer_list<std::string_view> dummy = {};
detail::assemble(payload, "SUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy));
}
void psubscribe(std::initializer_list<std::string_view> l)
{
std::initializer_list<std::string_view> dummy = {};
detail::assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy));
}
/// Adds ping to the request, see https://redis.io/commands/unsubscribe
void unsubscribe(std::string_view key)
{
@@ -236,12 +242,6 @@ public:
elements.emplace(command::lpush, std::string{key});
}
void psubscribe( std::initializer_list<std::string_view> l)
{
std::initializer_list<std::string_view> dummy = {};
detail::assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy));
}
/// Adds ping to the request, see https://redis.io/commands/publish
void publish(std::string_view key, std::string_view msg)
{

View File

@@ -76,7 +76,7 @@ struct test_general_fill {
p.get("b");
p.append("b", "b");
p.del("b");
p.subscribe("channel");
p.subscribe({"channel"});
p.publish("channel", "message");
p.incr("c");