diff --git a/examples/pubsub.cpp b/examples/pubsub.cpp index b54d1f0f..2abd3fb3 100644 --- a/examples/pubsub.cpp +++ b/examples/pubsub.cpp @@ -6,6 +6,7 @@ */ #include +#include #include @@ -13,8 +14,28 @@ using namespace aedis; -net::awaitable example() +net::awaitable +publisher(tcp_socket& socket, std::queue& requests) { + auto ex = net::this_coro::executor; + //timer st(ex); + + for (auto i = 0; i < 4; ++i) { + if (!socket.is_open()) + co_return; + + prepare_next(requests); + requests.back().publish("channel1", "Message to channel1"); + requests.back().publish("channel2", "Message to channel2"); + //st.expires_after(std::chrono::seconds{1}); + //co_await st.async_wait(); + co_await resp3::detail::async_write_some(socket, requests); + } +} + +net::awaitable subscriber() +{ + auto ex = net::this_coro::executor; auto socket = co_await make_connection(); std::queue requests; @@ -25,30 +46,15 @@ net::awaitable example() for (;;) { resp3::response resp; co_await cs.async_consume(socket, requests, resp); - std::cout << resp << std::endl; - if (resp.get_type() == resp3::type::push) + if (resp.get_type() == resp3::type::push) { + std::cout << "Received a server push\n" << resp << std::endl; continue; + } - auto const& elem = requests.front().elements.front(); - - std::cout << elem << std::endl; - switch (elem.cmd) { - case command::hello: - { - prepare_next(requests); - requests.back().ping(); - requests.back().subscribe("some-channel"); - } break; - case command::publish: break; - case command::quit: break; - case command::ping: - { - prepare_next(requests); - requests.back().publish("some-channel", "Some message"); - requests.back().quit(); - } break; - default: { } + if (requests.front().elements.front().cmd == command::hello) { + prepare_next(requests); + requests.back().subscribe({"channel1", "channel2"}); } } } @@ -56,6 +62,6 @@ net::awaitable example() int main() { net::io_context ioc; - co_spawn(ioc, example(), net::detached); + co_spawn(ioc, subscriber(), net::detached); ioc.run(); } diff --git a/examples/types.hpp b/examples/types.hpp index 79930427..19aeaf3e 100644 --- a/examples/types.hpp +++ b/examples/types.hpp @@ -11,3 +11,4 @@ using tcp_socket = aedis::net::use_awaitable_t<>::as_default_on_t; using tcp_resolver = aedis::net::use_awaitable_t<>::as_default_on_t; +using timer = aedis::net::use_awaitable_t<>::as_default_on_t; diff --git a/include/aedis/resp3/detail/read.hpp b/include/aedis/resp3/detail/read.hpp index 0ef01451..d8136d42 100644 --- a/include/aedis/resp3/detail/read.hpp +++ b/include/aedis/resp3/detail/read.hpp @@ -206,10 +206,11 @@ struct consumer_op { yield self.complete(ec, m_type); if (m_type != type::push) - requests.front().elements.pop(); + requests.front().elements.pop(); - } while (!std::empty(requests.front().elements)); - requests.pop(); + } while (!std::empty(requests) && !std::empty(requests.front().elements)); + if (!std::empty(requests)) + requests.pop(); } while (std::empty(requests)); } } diff --git a/include/aedis/resp3/detail/write.hpp b/include/aedis/resp3/detail/write.hpp index da43c2b7..1a1ffe13 100644 --- a/include/aedis/resp3/detail/write.hpp +++ b/include/aedis/resp3/detail/write.hpp @@ -76,10 +76,8 @@ struct write_some_op { if (ec) break; - requests.front().sent = true; - if (std::empty(requests.front().elements)) { - // We only pop when all commands in the pipeline has push + // We only pop when all commands in the pipeline have push // responses like subscribe, otherwise, pop is done when the // response arrives. requests.pop(); @@ -93,12 +91,15 @@ struct write_some_op { template< class AsyncWriteStream, - class CompletionToken> + class CompletionToken = + net::default_completion_token_t + > auto async_write_some( AsyncWriteStream& stream, std::queue& requests, - CompletionToken&& token) + CompletionToken&& token = + net::default_completion_token_t{}) { return net::async_compose< CompletionToken, diff --git a/include/aedis/resp3/impl/request.ipp b/include/aedis/resp3/impl/request.ipp index ffccce1b..a915ecff 100644 --- a/include/aedis/resp3/impl/request.ipp +++ b/include/aedis/resp3/impl/request.ipp @@ -12,17 +12,11 @@ namespace resp3 { bool prepare_next(std::queue& reqs) { - if (std::empty(reqs)) { + auto const cond = std::empty(reqs) || std::size(reqs) == 1; + if (cond) reqs.push({}); - return true; - } - if (reqs.back().sent) { - reqs.push({}); - return false; - } - - return false; + return cond; } std::ostream& operator<<(std::ostream& os, request::element const& e) diff --git a/include/aedis/resp3/request.hpp b/include/aedis/resp3/request.hpp index a39c3574..87eebe28 100644 --- a/include/aedis/resp3/request.hpp +++ b/include/aedis/resp3/request.hpp @@ -37,7 +37,6 @@ public: }; std::string payload; std::queue elements; - bool sent = false; public: /// Return the size of the pipeline. i.e. how many commands it @@ -138,13 +137,20 @@ public: elements.emplace(command::lpop, std::string{key}); } - /// Adds ping to the request, see https://redis.io/commands/subscribe - void subscribe(std::string_view key) + /// Adds subscribe to the request, see https://redis.io/commands/subscribe + void subscribe(std::initializer_list l) { // The response to this command is a push type. - detail::assemble(payload, "SUBSCRIBE", key); + std::initializer_list dummy = {}; + detail::assemble(payload, "SUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy)); } + void psubscribe(std::initializer_list l) + { + std::initializer_list dummy = {}; + detail::assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy)); + } + /// Adds ping to the request, see https://redis.io/commands/unsubscribe void unsubscribe(std::string_view key) { @@ -236,12 +242,6 @@ public: elements.emplace(command::lpush, std::string{key}); } - void psubscribe( std::initializer_list l) - { - std::initializer_list dummy = {}; - detail::assemble(payload, "PSUBSCRIBE", l, std::cbegin(dummy), std::cend(dummy)); - } - /// Adds ping to the request, see https://redis.io/commands/publish void publish(std::string_view key, std::string_view msg) { diff --git a/tests/general.cpp b/tests/general.cpp index 97bb2d20..7623a069 100644 --- a/tests/general.cpp +++ b/tests/general.cpp @@ -76,7 +76,7 @@ struct test_general_fill { p.get("b"); p.append("b", "b"); p.del("b"); - p.subscribe("channel"); + p.subscribe({"channel"}); p.publish("channel", "message"); p.incr("c");