2
0
mirror of https://github.com/boostorg/redis.git synced 2026-02-17 14:02:13 +00:00

Merge branch 'master' of https://github.com/mzimbres/aedis into master

This commit is contained in:
Marcelo Zimbres
2020-11-21 23:04:04 +01:00
4 changed files with 83 additions and 185 deletions

View File

@@ -21,7 +21,7 @@ add_library(aedis INTERFACE)
add_library(aedis::aedis ALIAS aedis)
target_link_libraries(aedis INTERFACE fmt::fmt Boost::system)
target_compile_features(aedis INTERFACE cxx_std_17)
target_compile_features(aedis INTERFACE cxx_std_20)
target_include_directories(aedis INTERFACE
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>

217
aedis.hpp
View File

@@ -664,6 +664,14 @@ struct instance {
std::string name;
};
inline
auto operator==(instance const& a, instance const& b)
{
return a.host == b.host
&& a.port == b.port
&& a.name == b.name;
}
// Still on development.
template <class AsyncReadStream>
class sentinel_op2 {
@@ -683,7 +691,7 @@ private:
};
struct impl {
AsyncReadStream stream;
AsyncReadStream& stream;
resp::buffer buffer;
resp::response res;
instance* inst;
@@ -692,8 +700,10 @@ private:
config cfg;
ip::tcp::resolver resv;
impl(config c, instance* i)
: inst(i)
impl(AsyncReadStream& s, config c, instance* i)
: stream(s)
, resv(s.get_executor())
, inst(i)
, cfg(c)
{}
};
@@ -701,8 +711,8 @@ private:
std::shared_ptr<impl> impl_;
public:
sentinel_op2(config cfg, instance* inst)
: impl_(std::make_shared<impl>(cfg, inst))
sentinel_op2(AsyncReadStream& stream, config cfg, instance* inst)
: impl_(std::make_shared<impl>(stream, cfg, inst))
{
auto const n = std::size(cfg.sentinels);
if (n == 0 || (n % 2 != 0))
@@ -718,8 +728,8 @@ public:
case op_state::on_connect:
{
auto const n = std::size(impl_->cfg.sentinels) / 2;
unsigned i = 0;
unsigned i = 0;
for (i = 0; i < n; ++i) {
auto const res = impl_->resv
.resolve( impl_->cfg.sentinels[2 * i + 0]
@@ -747,6 +757,7 @@ public:
// std::swap(cfg.sentinels[0], cfg.sentinels[2 * i + 0]);
// std::swap(cfg.sentinels[1], cfg.sentinels[2 * i + 1]);
impl_->inst->name = impl_->cfg.name;
impl_->cmd = sentinel("get-master-addr-by-name", impl_->inst->name);
impl_->opstate = op_state::on_write;
net::async_write( impl_->stream
@@ -780,101 +791,25 @@ public:
}
}
};
template <class AsyncReadStream>
class sentinel_op {
public:
struct config {
// A list of redis sentinels e.g. ip1 port1 ip2 port2 ...
std::vector<std::string> sentinels {"127.0.0.1", "26379"};
std::string name {"mymaster"};
std::string role {"master"};
};
private:
enum class op_state
{ starting
, writing
, waiting
};
struct impl {
AsyncReadStream& stream;
resp::buffer buffer;
resp::response res;
instance* inst;
op_state opstate {op_state::starting};
std::string cmd;
impl(AsyncReadStream& s, instance* i)
: stream {s}
, inst {i}
{}
};
std::shared_ptr<impl> impl_;
public:
sentinel_op(
AsyncReadStream& stream,
instance* inst)
: impl_(std::make_shared<impl>(stream, inst))
{
}
template <class Self>
void operator()( Self& self
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
switch (impl_->opstate) {
case op_state::starting:
{
impl_->cmd = sentinel("get-master-addr-by-name", impl_->inst->name);
impl_->opstate = op_state::writing;
net::async_write( impl_->stream
, net::buffer(impl_->cmd)
, std::move(self));
} break;
case op_state::writing:
{
if (ec)
return self.complete(ec);
impl_->opstate = op_state::waiting;
resp::async_read(
impl_->stream,
impl_->buffer,
impl_->res,
std::move(self));
} break;
case op_state::waiting:
{
auto n = std::size(impl_->res.res);
if (n > 1) {
impl_->inst->host = impl_->res.res[0];
impl_->inst->port = impl_->res.res[1];
}
self.complete(ec);
} break;
default:
{
}
}
}
};
template <class AsyncReadStream, class CompletionToken>
auto
async_get_instance( AsyncReadStream& s
, instance* p2
, CompletionToken&& token)
template <
class AsyncReadStream,
class CompletionToken =
net::default_completion_token_t<typename AsyncReadStream::executor_type>
>
auto async_get_instance2(
AsyncReadStream& stream,
typename sentinel_op2<AsyncReadStream>::config cfg,
instance& inst,
CompletionToken&& token =
net::default_completion_token_t<typename AsyncReadStream::executor_type>{})
{
return net::async_compose
< CompletionToken
, void(boost::system::error_code)
>(sentinel_op<AsyncReadStream> {s, p2}, token, s);
>(sentinel_op2<AsyncReadStream> {stream, cfg, &inst},
token,
stream);
}
template <class AsyncReadStream>
@@ -886,10 +821,10 @@ public:
std::function<void( boost::system::error_code const&
, std::vector<std::string>)>;
using sentinel_config = typename sentinel_op<AsyncReadStream>::config;
using sentinel_config = typename sentinel_op2<AsyncReadStream>::config;
struct config {
using sentinel_config = typename sentinel_op<AsyncReadStream>::config;
using sentinel_config = typename sentinel_op2<AsyncReadStream>::config;
sentinel_config sentinel;
int max_pipeline_size {256};
log::level log_filter {log::level::debug};
@@ -906,14 +841,12 @@ private:
ip::tcp::resolver resolver_;
AsyncReadStream stream_;
net::steady_timer timer_;
resp::buffer buffer_;
resp::response res_;
std::queue<queue_item> msg_queue_;
int pipeline_size_ = 0;
long long pipeline_id_ = 0;
instance instance_;
int sentinel_idx_ = 0;
bool disable_reconnect_ = false;
msg_handler_type msg_handler_ = [this](auto ec, auto const& res)
@@ -1121,56 +1054,6 @@ private:
}
}
void
on_sentinel_conn( boost::system::error_code ec
, net::ip::tcp::endpoint const& endpoint)
{
if (ec) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/on_sentinel_conn: {1}. Endpoint: {2}"
, id_
, ec.message()
, endpoint);
// Ask the next sentinel only if we did not try them all yet.
++sentinel_idx_;
if ((2 * sentinel_idx_) == std::size(cfg_.sentinel.sentinels)) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/No sentinel knows the redis instance address."
, id_);
return;
}
run(); // Tries the next sentinel.
return;
}
// The redis documentation recommends to put the first sentinel that
// replies in the start of the list. See
// https://redis.io/topics/sentinel-clients
if (sentinel_idx_ != 0) {
auto const r = sentinel_idx_;
std::swap(cfg_.sentinel.sentinels[0], cfg_.sentinel.sentinels[2 * r]);
std::swap(cfg_.sentinel.sentinels[1], cfg_.sentinel.sentinels[2 * r + 1]);
sentinel_idx_ = 0;
}
log::write( cfg_.log_filter
, log::level::info
, "{0}/Success connecting to sentinel {1}"
, id_
, endpoint);
auto g = [this](auto ec)
{ on_instance(instance_.host, instance_.port, ec); };
instance_.name = cfg_.sentinel.name;
async_get_instance(stream_, &instance_, std::move(g));
}
public:
session( net::io_context& ioc
, config cfg
@@ -1179,7 +1062,6 @@ public:
, cfg_ {std::move(cfg)}
, resolver_ {ioc}
, stream_ {ioc}
, timer_ {ioc, std::chrono::steady_clock::time_point::max()}
{
if (cfg_.max_pipeline_size < 1)
cfg_.max_pipeline_size = 1;
@@ -1224,37 +1106,10 @@ public:
void run()
{
auto const n = std::size(cfg_.sentinel.sentinels);
auto f = [this](auto ec)
{ on_instance(instance_.host, instance_.port, ec); };
if (n == 0 || (n % 2 != 0)) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/run: Incompatible sentinels array size: {1}"
, id_, n);
return;
}
auto const r = sentinel_idx_;
boost::system::error_code ec;
auto res = resolver_
.resolve( cfg_.sentinel.sentinels[2 * r]
, cfg_.sentinel.sentinels[2 * r + 1]
, ec);
if (ec) {
log::write( cfg_.log_filter
, log::level::warning
, "{0}/run: Can't resolve sentinel: {1}."
, id_
, ec.message());
return;
}
auto f = [this](auto ec, auto iter)
{ on_sentinel_conn(ec, iter); };
net::async_connect(stream_, res, f);
async_get_instance2(stream_, cfg_.sentinel, instance_, f);
}
void disable_reconnect()

View File

@@ -140,6 +140,20 @@ awaitable<void> example4()
}
}
awaitable<void> example5()
{
tcp_socket socket {co_await this_coro::executor};
sentinel_op2<tcp_socket>::config cfg
{ {"127.0.0.1", "26379"}
, {"mymaster"}
, {"master"}
};
instance inst;
co_await async_get_instance2(socket, cfg, inst);
}
int main()
{
io_context ioc {1};
@@ -147,7 +161,7 @@ int main()
co_spawn(ioc, example2(), detached);
co_spawn(ioc, example3(), detached);
co_spawn(ioc, example4(), detached);
co_spawn(ioc, example5(), detached);
ioc.run();
}

View File

@@ -207,10 +207,33 @@ void send(std::string cmd)
ioc.run();
}
int main(int argc, char* argv[])
void test5()
{
rpush_lrange();
net::io_context ioc {1};
tcp::socket socket {ioc};
sentinel_op2<tcp::socket>::config cfg
{ {"127.0.0.1", "26377", "127.0.0.1", "26378", "127.0.0.1", "26379"}
, {"mymaster"}
, {"master"}
};
instance inst;
auto f = [&](auto ec)
{
instance expected {"127.0.0.1", "6379", "mymaster"};
if (inst == expected)
std::cout << "Success: async_get_instance2" << std::endl;
else
std::cout << "Error: async_get_instance2" << std::endl;
};
async_get_instance2(socket, cfg, inst, f);
ioc.run();
}
void offline()
{
// Redis answer - Expected vector.
std::vector<std::pair<std::string, std::vector<std::string>>> payloads
{ {{"+OK\r\n"}, {"OK"}}
@@ -230,7 +253,13 @@ int main(int argc, char* argv[])
else
std::cout << "Success: Offline tests." << std::endl;
}
}
int main(int argc, char* argv[])
{
rpush_lrange();
test5();
offline();
send(ping());
}