mirror of
https://github.com/boostorg/redis.git
synced 2026-01-19 04:42:09 +00:00
Simplifies the connect operations.
This commit is contained in:
@@ -12,9 +12,8 @@
|
||||
#include <boost/asio/compose.hpp>
|
||||
#include <boost/asio/connect.hpp>
|
||||
#include <boost/asio/coroutine.hpp>
|
||||
#include <boost/asio/experimental/parallel_group.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/cancel_after.hpp>
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
|
||||
@@ -30,65 +29,29 @@ struct connect_op {
|
||||
|
||||
template <class Self>
|
||||
void operator()( Self& self
|
||||
, std::array<std::size_t, 2> const& order = {}
|
||||
, system::error_code const& ec1 = {}
|
||||
, asio::ip::tcp::endpoint const& ep= {}
|
||||
, system::error_code const& ec2 = {})
|
||||
, system::error_code const& ec = {}
|
||||
, asio::ip::tcp::endpoint const& ep= {})
|
||||
{
|
||||
BOOST_ASIO_CORO_REENTER (coro)
|
||||
{
|
||||
ctor_->timer_.expires_after(ctor_->timeout_);
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
asio::experimental::make_parallel_group(
|
||||
[this](auto token)
|
||||
{
|
||||
auto f = [](system::error_code const&, auto const&) { return true; };
|
||||
return asio::async_connect(*stream, *res_, f, token);
|
||||
},
|
||||
[this](auto token) { return ctor_->timer_.async_wait(token);}
|
||||
).async_wait(
|
||||
asio::experimental::wait_for_one(),
|
||||
std::move(self));
|
||||
asio::async_connect(*stream, *res_,
|
||||
[](system::error_code const&, auto const&) { return true; },
|
||||
asio::cancel_after(ctor_->timeout_, std::move(self)));
|
||||
|
||||
if (is_cancelled(self)) {
|
||||
self.complete(asio::error::operation_aborted);
|
||||
return;
|
||||
ctor_->endpoint_ = ep;
|
||||
|
||||
if (ec == asio::error::operation_aborted) {
|
||||
ec == error::connect_timeout;
|
||||
}
|
||||
|
||||
switch (order[0]) {
|
||||
case 0: {
|
||||
ctor_->endpoint_ = ep;
|
||||
self.complete(ec1);
|
||||
} break;
|
||||
case 1:
|
||||
{
|
||||
if (ec2) {
|
||||
self.complete(ec2);
|
||||
} else {
|
||||
self.complete(error::connect_timeout);
|
||||
}
|
||||
} break;
|
||||
|
||||
default: BOOST_ASSERT(false);
|
||||
}
|
||||
self.complete(ec);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class Executor>
|
||||
class connector {
|
||||
public:
|
||||
using timer_type =
|
||||
asio::basic_waitable_timer<
|
||||
std::chrono::steady_clock,
|
||||
asio::wait_traits<std::chrono::steady_clock>,
|
||||
Executor>;
|
||||
|
||||
connector(Executor ex)
|
||||
: timer_{ex}
|
||||
{}
|
||||
|
||||
void set_config(config const& cfg)
|
||||
{ timeout_ = cfg.connect_timeout; }
|
||||
|
||||
@@ -102,20 +65,7 @@ public:
|
||||
return asio::async_compose
|
||||
< CompletionToken
|
||||
, void(system::error_code)
|
||||
>(connect_op<connector, Stream>{this, &stream, &res}, token, timer_);
|
||||
}
|
||||
|
||||
std::size_t cancel(operation op)
|
||||
{
|
||||
switch (op) {
|
||||
case operation::connect:
|
||||
case operation::all:
|
||||
timer_.cancel();
|
||||
break;
|
||||
default: /* ignore */;
|
||||
}
|
||||
|
||||
return 0;
|
||||
>(connect_op<connector, Stream>{this, &stream, &res}, token);
|
||||
}
|
||||
|
||||
auto const& endpoint() const noexcept { return endpoint_;}
|
||||
@@ -123,7 +73,6 @@ public:
|
||||
private:
|
||||
template <class, class> friend struct connect_op;
|
||||
|
||||
timer_type timer_;
|
||||
std::chrono::steady_clock::duration timeout_ = std::chrono::seconds{2};
|
||||
asio::ip::tcp::endpoint endpoint_;
|
||||
};
|
||||
|
||||
@@ -40,7 +40,9 @@ struct resolve_op {
|
||||
|
||||
resv_->results_ = res;
|
||||
|
||||
// TODO: map operation_canceled into error::resolve_timeout
|
||||
if (ec == asio::error::operation_aborted) {
|
||||
ec == error::resolve_timeout;
|
||||
}
|
||||
self.complete(ec);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
#include <boost/redis/detail/resolver.hpp>
|
||||
#include <boost/redis/detail/handshaker.hpp>
|
||||
#include <boost/asio/compose.hpp>
|
||||
#include <boost/asio/connect.hpp>
|
||||
#include <boost/asio/coroutine.hpp>
|
||||
#include <boost/asio/experimental/parallel_group.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
@@ -165,7 +164,6 @@ class runner {
|
||||
public:
|
||||
runner(Executor ex, config cfg)
|
||||
: resv_{ex}
|
||||
, ctor_{ex}
|
||||
, hsher_{ex}
|
||||
, health_checker_{ex}
|
||||
, cfg_{cfg}
|
||||
@@ -174,7 +172,6 @@ public:
|
||||
std::size_t cancel(operation op)
|
||||
{
|
||||
resv_.cancel(op);
|
||||
ctor_.cancel(op);
|
||||
hsher_.cancel(op);
|
||||
health_checker_.cancel(op);
|
||||
return 0U;
|
||||
@@ -202,10 +199,8 @@ public:
|
||||
|
||||
private:
|
||||
using resolver_type = resolver<Executor>;
|
||||
using connector_type = connector<Executor>;
|
||||
using handshaker_type = detail::handshaker<Executor>;
|
||||
using health_checker_type = health_checker<Executor>;
|
||||
using timer_type = typename connector_type::timer_type;
|
||||
|
||||
template <class, class, class> friend class runner_op;
|
||||
template <class, class, class> friend struct hello_op;
|
||||
@@ -245,7 +240,7 @@ private:
|
||||
}
|
||||
|
||||
resolver_type resv_;
|
||||
connector_type ctor_;
|
||||
connector ctor_;
|
||||
handshaker_type hsher_;
|
||||
health_checker_type health_checker_;
|
||||
request hello_req_;
|
||||
|
||||
Reference in New Issue
Block a user