diff --git a/Makefile b/Makefile index 8c611f89..ee007187 100644 --- a/Makefile +++ b/Makefile @@ -8,10 +8,17 @@ CPPFLAGS += -std=c++17 -g CPPFLAGS += -I/opt/boost_1_71_0/include CPPFLAGS += -DBOOST_ASIO_CONCURRENCY_HINT_1=BOOST_ASIO_CONCURRENCY_HINT_UNSAFE +all: examples + +Makefile.dep: + -$(CXX) -MM ./*.cpp > $@ + +-include Makefile.dep + examples: % : %.o $(CXX) -o $@ $^ $(CPPFLAGS) -lfmt -lpthread .PHONY: clean clean: - rm -f examples examples.o + rm -f examples examples.o Makefile.dep diff --git a/README.md b/README.md index ea7ebe75..52fe6bf9 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,8 @@ Aedis is a redis client designed with the following in mind * Seamless integration with async code * Based on Boost.Asio and the Networking TS -* Speed as a result of simplicity -* No overhead abstractions * Easy and intuitive as clients for other languages +* Speed as a result of simplicity This library is header only. You only have to include `aedis.hpp` in your project. Current dendencies are @@ -112,7 +111,7 @@ Support for redis sentinel is still not implemented. # Callbacks -It is possible to set some callbacks as shown in the example bellow +The example below shows how to specify the connection and message callbacks ```cpp void example3() diff --git a/aedis.hpp b/aedis.hpp index 5fcbbe2e..1be7e7f5 100644 --- a/aedis.hpp +++ b/aedis.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +23,8 @@ #include #include #include +#include +#include #include @@ -41,6 +44,12 @@ namespace resp struct buffer { std::string data; std::vector res; + + void clear() + { + data.clear(); + res.clear(); + } }; inline @@ -178,8 +187,8 @@ struct read_op { switch (buffer_->data.front()) { case '$': { - // TODO: Do not push in the vector but find a way to - // report nil. + // We may want to consider not pushing in the vector + // but find a way to report nil. if (buffer_->data.compare(1, 2, "-1") == 0) { buffer_->res.push_back({}); --counter_; @@ -232,6 +241,13 @@ auto async_read(tcp::socket& s, resp::buffer* buffer, CompletionToken&& token) } +inline +auto sentinel(std::string const& arg, std::string const& name) +{ + auto par = {name}; + return resp::assemble("SENTINEL", {arg}, std::cbegin(par), std::cend(par)); +} + inline auto append(std::string const& key, std::string const& msg) { @@ -251,6 +267,12 @@ auto bgrewriteaof() return resp::assemble("BGREWRITEAOF"); } +inline +auto role() +{ + return resp::assemble("ROLE"); +} + inline auto bgsave() { @@ -282,6 +304,20 @@ auto rpush( std::string const& key return rpush(key, std::cbegin(v), std::cend(v)); } +template +auto rpush( std::string const& key + , std::deque const& v) +{ + return rpush(key, std::cbegin(v), std::cend(v)); +} + +template +auto rpush( std::string const& key + , std::array const& a) +{ + return rpush(key, std::cbegin(a), std::cend(a)); +} + template auto rpush( std::string const& key , std::list const& l) @@ -289,6 +325,13 @@ auto rpush( std::string const& key return rpush(key, std::cbegin(l), std::cend(l)); } +template +auto rpush( std::string const& key + , std::forward_list const& l) +{ + return rpush(key, std::cbegin(l), std::cend(l)); +} + template auto rpush( std::string const& key , std::set const& s) @@ -296,6 +339,29 @@ auto rpush( std::string const& key return rpush(key, std::cbegin(s), std::cend(s)); } +template +auto rpush( std::string const& key + , std::multiset const& s) +{ + return rpush(key, std::cbegin(s), std::cend(s)); +} + +template +auto rpush( std::string const& key + , std::unordered_set< T, Hash, KeyEqual, Allocator + > const& s) +{ + return rpush(key, std::cbegin(s), std::cend(s)); +} + +template +auto rpush( std::string const& key + , std::unordered_multiset< T, Hash, KeyEqual, Allocator + > const& s) +{ + return rpush(key, std::cbegin(s), std::cend(s)); +} + template auto lpush(std::string const& key, Iter begin, Iter end) { @@ -497,6 +563,76 @@ void write(level filter, level ll, char const* fmt, Args const& ... args) } +struct instance { + std::string host; + std::string port; + std::string name; +}; + +struct instance_op { + enum class states + { starting + , writing + , waiting }; + + tcp::socket& socket_; + resp::buffer* buffer_; + instance* instance_; + states state_ {states::starting}; + std::string cmd_; + + template + void operator()( Self& self + , boost::system::error_code ec = {} + , std::size_t n = 0) + { + switch (state_) { + case states::starting: + { + cmd_ = sentinel("get-master-addr-by-name", instance_->name); + state_ = states::writing; + net::async_write( socket_ + , net::buffer(cmd_) + , std::move(self)); + } break; + case states::writing: + { + if (ec) + return self.complete(ec); + + state_ = states::waiting; + resp::async_read(socket_, buffer_, std::move(self)); + + } break; + case states::waiting: + { + auto n = std::size(buffer_->res); + if (n > 1) { + instance_->host = buffer_->res[0]; + instance_->port = buffer_->res[1]; + } + self.complete(ec); + } break; + default: + { + } + } + } +}; + +template +auto +async_get_instance( tcp::socket& s + , resp::buffer* p + , instance* p2 + , CompletionToken&& token) +{ + return net::async_compose + < CompletionToken + , void(boost::system::error_code) + >(instance_op {s, p, p2}, token, s); +} + class session { public: using on_conn_handler_type = std::function; @@ -506,19 +642,11 @@ public: , std::vector)>; struct config { - std::string host {"127.0.0.1"}; - std::string port {"6379"}; + // A list of redis sentinels e.g. ip1 port1 ip2 port2 ... + std::vector sentinels {"127.0.0.1", "26379"}; + std::string name {"mymaster"}; + std::string role {"master"}; int max_pipeline_size {256}; - - // If the connection to redis is lost, the session tries to reconnect with - // this interval. This will be removed after the implementation of redis - // sentinel is finished. - std::chrono::milliseconds conn_retry_interval {500}; - - // A list of redis sentinels e.g. ip1:port1 ip2:port2 ... - // Not yet in use. - std::vector sentinels; - log::level log_filter {log::level::debug}; }; @@ -528,19 +656,20 @@ private: ip::tcp::resolver resolver_; tcp::socket socket_; - // This variable will be removed after the redis sentinel implementation. net::steady_timer timer_; resp::buffer buffer_; std::queue msg_queue_; - int pipeline_counter_ = 0; + int pipeline_size_ = 0; long long pipeline_id_ = 0; + instance instance_; + int sentinel_idx_ = 0; msg_handler_type msg_handler_ = [this](auto ec, auto const& res) { if (ec) { log::write( cfg_.log_filter , log::level::debug - , "{0}/msg_handler_: {1}." + , "{0}/msg_handler: {1}." , id_ , ec.message()); } @@ -555,47 +684,88 @@ private: on_conn_handler_type conn_handler_ = [](){}; - void start_reading_resp() + void do_read_resp() { buffer_.res.clear(); - auto handler = [this](auto const& ec) + auto f = [this](auto const& ec) { on_resp(ec); }; - resp::async_read(socket_, &buffer_, std::move(handler)); + resp::async_read(socket_, &buffer_, std::move(f)); } - void on_resolve( boost::system::error_code const& ec - , net::ip::tcp::resolver::results_type results) + void + on_instance( std::string const& host + , std::string const& port + , boost::system::error_code ec) + { + buffer_.clear(); + + if (ec) { + log::write( cfg_.log_filter + , log::level::warning + , "{0}/on_instance: {1}. Endpoint: {2}" + , id_ + , ec.message()); + return; + } + + // Closes the connection with the sentinel and connects with the master. + ec = {}; + socket_.close(ec); + if (ec) { + log::write( cfg_.log_filter + , log::level::warning + , "{0}/on_instance: {1}." + , id_ + , ec.message()); + return; + } + + // NOTE: Call sync resolve to prevent asio from starting a new thread. + ec = {}; + auto res = resolver_.resolve(host, port, ec); + if (ec) { + log::write( cfg_.log_filter + , log::level::warning + , "{0}/on_instance: {1}." + , id_ + , ec.message()); + return; + } + + do_connect(res); + } + + void do_connect(net::ip::tcp::resolver::results_type res) + { + auto f = [this](auto ec, auto iter) + { on_connect(ec, iter); }; + + net::async_connect(socket_, res, f); + } + + void on_connect( boost::system::error_code ec + , net::ip::tcp::endpoint const& endpoint) { if (ec) { log::write( cfg_.log_filter , log::level::warning - , "{0}/on_resolve: {1}." - , id_, ec.message()); - return; - } - - auto handler = [this](auto ec, auto iter) - { on_connect(ec, iter); }; - - net::async_connect(socket_, results, handler); - } - - void on_connect( boost::system::error_code const& ec - , net::ip::tcp::endpoint const& endpoint) - { - if (ec) { + , "{0}/on_connect: {1}. Endpoint: {2}" + , id_ + , ec.message() + , endpoint); + run(); return; } log::write( cfg_.log_filter , log::level::info - , "{0}/on_connect: Success connecting to {1}" + , "{0}/Success connecting to redis instance {1}" , id_ , endpoint); - start_reading_resp(); + do_read_resp(); // Consumes any messages that have been eventually posted while the // connection was not established, or consumes msgs when a @@ -618,56 +788,30 @@ private: conn_handler_(); } - void on_resp(boost::system::error_code const& ec) + void on_resp(boost::system::error_code ec) { if (ec) { log::write( cfg_.log_filter , log::level::warning - , "{0}/on_resp1: {1}." + , "{0}/on_resp: {1}." , id_ , ec.message()); - - auto const b1 = ec == net::error::eof; - auto const b2 = ec == net::error::connection_reset; - if (b1 || b2) { - // Redis has cleanly closed the connection, we try to - // reconnect. - timer_.expires_after(cfg_.conn_retry_interval); - - auto const handler = [this](auto const& ec) - { on_conn_closed(ec); }; - - timer_.async_wait(handler); - return; - } - - if (ec == net::error::operation_aborted) { - // The operation has been canceled, this can happen in only - // one way - // - // 1. There has been a request from the worker to close the - // connection and leave. In this case we should NOT try to - // reconnect. We have nothing to do. - return; - } - - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_resp2: Unhandled error '{1}'." - , id_, ec.message()); - + // Some of the possible errors are. + // net::error::eof + // net::error::connection_reset + // net::error::operation_aborted + close(); + run(); return; } msg_handler_(ec, std::move(buffer_.res)); - start_reading_resp(); + do_read_resp(); if (std::empty(msg_queue_)) return; - msg_queue_.pop(); - if (!std::empty(msg_queue_)) do_write(); } @@ -681,44 +825,18 @@ private: , id_, ec.message()); return; } + + msg_queue_.pop(); } void do_write() { - auto handler = [this](auto ec, auto n) + auto f = [this](auto ec, auto n) { on_write(ec, n); }; - net::async_write(socket_, net::buffer(msg_queue_.front()), handler); - } - - void on_conn_closed(boost::system::error_code ec) - { - if (ec) { - if (ec == net::error::operation_aborted) { - // The timer has been canceled. Probably somebody - // shutting down the application while we are trying to - // reconnect. - return; - } - - log::write( cfg_.log_filter - , log::level::warning - , "{0}/on_conn_closed: {1}" - , id_ - , ec.message()); - - return; - } - - // Given that the peer has shutdown the connection (I think) - // we do not need to call shutdown. - //socket_.shutdown(net::ip::tcp::socket::shutdown_both, ec); - socket_.close(ec); - - // Instead of simply trying to reconnect I will run the - // resolver again. This will be changes when sentinel - // support is implemented. - run(); + net::async_write( socket_ + , net::buffer(msg_queue_.front()) + , f); } public: @@ -737,19 +855,19 @@ public: session(net::io_context& ioc) : session {ioc, {}, {}} { } - void set_on_conn_handler(on_conn_handler_type handler) - { conn_handler_ = std::move(handler);}; + void set_on_conn_handler(on_conn_handler_type f) + { conn_handler_ = std::move(f);}; - void set_msg_handler(msg_handler_type handler) - { msg_handler_ = std::move(handler);}; + void set_msg_handler(msg_handler_type f) + { msg_handler_ = std::move(f);}; auto send(std::string msg) { auto const max_pp_size_reached = - pipeline_counter_ >= cfg_.max_pipeline_size; + pipeline_size_ >= cfg_.max_pipeline_size; if (max_pp_size_reached) - pipeline_counter_ = 0; + pipeline_size_ = 0; auto const is_empty = std::empty(msg_queue_); @@ -758,7 +876,7 @@ public: ++pipeline_id_; } else { msg_queue_.back() += msg; // Uses pipeline. - ++pipeline_counter_; + ++pipeline_size_; } if (is_empty && socket_.is_open()) @@ -789,20 +907,91 @@ public: , id_ , ec.message()); } - - timer_.cancel(); } + void + on_sentinel_conn( boost::system::error_code ec + , net::ip::tcp::endpoint const& endpoint) + { + if (ec) { + log::write( cfg_.log_filter + , log::level::warning + , "{0}/on_sentinel_conn: {1}. Endpoint: {2}" + , id_ + , ec.message() + , endpoint); + + // Ask the next sentinel only if we did not try them all yet. + ++sentinel_idx_; + if ((2 * sentinel_idx_) == std::size(cfg_.sentinels)) { + log::write( cfg_.log_filter + , log::level::warning + , "{0}/No sentinel knows the redis instance address." + , id_); + + return; + } + + run(); // Tries the next sentinel. + return; + } + + // The redis documentation recommends to put the first sentinel that + // replies in the start of the list. See + // https://redis.io/topics/sentinel-clients + if (sentinel_idx_ != 0) { + auto const r = sentinel_idx_; + std::swap(cfg_.sentinels[0], cfg_.sentinels[2 * r]); + std::swap(cfg_.sentinels[1], cfg_.sentinels[2 * r + 1]); + sentinel_idx_ = 0; + } + + log::write( cfg_.log_filter + , log::level::info + , "{0}/Success connecting to sentinel {1}" + , id_ + , endpoint); + + auto g = [this](auto ec) + { on_instance(instance_.host, instance_.port, ec); }; + + instance_.name = cfg_.name; + async_get_instance(socket_, &buffer_, &instance_, std::move(g)); + } void run() { - //auto addr = split(cfg_.sentinels.front()); - //std::cout << addr.first << " -- " << addr.second << std::endl; + auto const n = std::size(cfg_.sentinels); + + if (n == 0 || (n % 2 != 0)) { + log::write( cfg_.log_filter + , log::level::warning + , "{0}/run: Incompatible sentinels array size: {1}" + , id_, n); + return; + } + + auto const r = sentinel_idx_; - // Calling sync resolve to avoid starting a new thread. boost::system::error_code ec; - auto res = resolver_.resolve(cfg_.host, cfg_.port, ec); - on_resolve(ec, res); + auto res = resolver_ + .resolve( cfg_.sentinels[2 * r] + , cfg_.sentinels[2 * r + 1] + , ec); + + if (ec) { + log::write( cfg_.log_filter + , log::level::warning + , "{0}/run: Can't resolve sentinel: {1}." + , id_ + , ec.message()); + return; + } + + auto f = [this](auto ec, auto iter) + { on_sentinel_conn(ec, iter); }; + + net::async_connect(socket_, res, f); } }; diff --git a/examples.cpp b/examples.cpp index cc1b224c..ee7c0248 100644 --- a/examples.cpp +++ b/examples.cpp @@ -20,6 +20,58 @@ void send(std::string cmd) ioc.run(); } +void rpush_ex() +{ + std::array a + {"a1", "a2", "a3"}; + + std::vector b + {"b1" ,"b2", "b3"}; + + std::list c + {"c1" ,"c2", "c3"}; + + std::set d + {"d1" ,"d2", "d3"}; + + std::deque e + {"e1" ,"e2", "e3"}; + + std::forward_list f + {"f1" ,"f2", "f3"}; + + std::multiset g + {"g1" ,"g2", "g3"}; + + std::unordered_set h + {"h1" ,"h2", "h3"}; + + std::unordered_set i + {"i1" ,"i2", "i3"}; + + auto s = flushall() + + rpush("a", a) + + lrange("a") + + rpush("b", b) + + lrange("b") + + rpush("c", c) + + lrange("c") + + rpush("d", d) + + lrange("d") + + rpush("e", e) + + lrange("e") + + rpush("f", f) + + lrange("f") + + rpush("g", g) + + lrange("g") + + rpush("h", h) + + lrange("h") + + rpush("i", i) + + lrange("i") + ; + + send(std::move(s)); +} void example1() { std::list a @@ -40,6 +92,7 @@ void example1() }; auto s = ping() + + role() + flushall() + rpush("a", a) + lrange("a") @@ -74,11 +127,12 @@ void example2() net::io_context ioc; session::config cfg - { "127.0.0.1" // host - , "6379" // port + { { "127.0.0.1", "26377" + , "127.0.0.1", "26378" + , "127.0.0.1", "26379"} // Sentinel addresses + , "mymaster" // Instance name + , "master" // Instance role , 256 // Max pipeline size - , std::chrono::milliseconds {500} // Connection retry - , {} // Sentinel addresses , log::level::info }; @@ -119,8 +173,9 @@ void example3() int main(int argc, char* argv[]) { - example1(); - //example2(); + //example1(); + example2(); //example3(); + //rpush_ex(); }