/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com) * * Distributed under the Boost Software License, Version 1.0. (See * accompanying file LICENSE.txt) */ #include #include #include #include #include #include #include #include "user_session.hpp" namespace net = boost::asio; using aedis::resp3::node; using aedis::adapter::adapt; using aedis::adapter::adapter_t; using aedis::redis::command; using aedis::generic::client; using aedis::user_session; using aedis::user_session_base; using client_type = client; using response_type = std::vector>; class receiver { public: receiver(std::shared_ptr db) : adapter_{adapt(resp_)} , db_{db} {} void on_resp3(command cmd, node const& nd, boost::system::error_code& ec) { adapter_(nd, ec); } void on_read(command cmd, std::size_t) { switch (cmd) { case command::ping: if (resp_.front().value != "PONG") { sessions_.front()->deliver(resp_.front().value); sessions_.pop(); } break; case command::incr: std::cout << "Echos so far: " << resp_.front().value << std::endl; break; default: /* Ignore */; } resp_.clear(); } void on_write(std::size_t n) { std::cout << "Number of bytes written: " << n << std::endl; } void on_push(std::size_t n) { } void add_user_session(std::shared_ptr session) { sessions_.push(session); } private: response_type resp_; adapter_t adapter_; std::shared_ptr db_; std::queue> sessions_; }; net::awaitable listener( std::shared_ptr acc, std::shared_ptr db, std::shared_ptr recv) { for (;;) { auto socket = co_await acc->async_accept(net::use_awaitable); auto session = std::make_shared(std::move(socket)); auto on_user_msg = [db, recv, session](std::string const& msg) { if (!msg.empty()) { db->send(command::ping, msg); db->send(command::incr, "echo-counter"); recv->add_user_session(session); } }; session->start(on_user_msg); } } int main() { try { net::io_context ioc; auto db = std::make_shared(ioc.get_executor()); auto recv = std::make_shared(db); db->set_receiver(recv); // TODO: Close the listener when async_run returns. db->async_run("127.0.0.1", "6379", [](auto ec){ std::cout << ec.message() << std::endl;}); auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555}; auto acc = std::make_shared(ioc.get_executor(), endpoint); co_spawn(ioc, listener(acc, db, recv), net::detached); net::signal_set signals(ioc.get_executor(), SIGINT, SIGTERM); signals.async_wait([&] (auto, int) { ioc.stop(); }); ioc.run(); } catch (std::exception const& e) { std::cerr << e.what() << std::endl; } }