2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-19 04:42:09 +00:00

Fixes usage of executors and adds test.

This commit is contained in:
Marcelo Zimbres
2022-04-22 15:30:36 +02:00
parent a5376bc05f
commit fd0cae92ee
2 changed files with 56 additions and 13 deletions

View File

@@ -84,8 +84,7 @@ public:
* \param cfg Configuration parameters.
*/
client(boost::asio::any_io_executor ex, config cfg = config{})
: socket_{std::make_shared<AsyncReadWriteStream>(ex)}
, read_timer_{ex}
: read_timer_{ex}
, write_timer_{ex}
, wait_write_timer_{ex}
, resv_{ex}
@@ -94,7 +93,7 @@ public:
}
/// Returns the executor.
auto get_executor() {return socket_->get_executor();}
auto get_executor() {return read_timer_.get_executor();}
/** @brief Adds a command to the output command queue.
*
@@ -279,7 +278,7 @@ public:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::run_op<client>{this}, token, *socket_, read_timer_, write_timer_, wait_write_timer_);
>(detail::run_op<client>{this}, token, /* *socket_,*/ read_timer_, write_timer_, wait_write_timer_);
}
/// Set the read handler.
@@ -422,7 +421,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::resolve_op<client>{this}, token, *socket_);
>(detail::resolve_op<client>{this}, token, resv_.get_executor());
}
// Connects the socket to one of the endpoints in endpoints_ and
@@ -434,7 +433,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::connect_op<client>{this}, token, *socket_);
>(detail::connect_op<client>{this}, token, write_timer_.get_executor());
}
// Reads a complete resp3 response from the socket using the
@@ -448,7 +447,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::read_op<client>{this}, token, *socket_);
>(detail::read_op<client>{this}, token, read_timer_.get_executor());
}
// Loops on async_read described above.
@@ -459,7 +458,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::reader_op<client, Command>{this}, token, *socket_);
>(detail::reader_op<client, Command>{this}, token, read_timer_.get_executor());
}
// Write with a timeout.
@@ -471,7 +470,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::write_op<client>{this}, token, *socket_, write_timer_);
>(detail::write_op<client>{this}, token, write_timer_);
}
template <class CompletionToken = default_completion_token_type>
@@ -481,7 +480,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::writer_op<client>{this}, token, *socket_, wait_write_timer_);
>(detail::writer_op<client>{this}, token, wait_write_timer_);
}
void on_reader_exit()

View File

@@ -204,6 +204,8 @@ public:
, adapter_{adapt(counter)}
{}
void on_read(command) {}
void on_write()
{
if (counter == 0) {
@@ -230,9 +232,10 @@ private:
adapter_t<int> adapter_;
};
template <class Receiver>
struct reconnect {
client_type db;
receiver5 recv;
Receiver recv;
boost::asio::steady_timer timer;
net::coroutine coro;
@@ -241,6 +244,7 @@ struct reconnect {
, recv{db}
, timer{ex}
{
db.set_read_handler([this](auto cmd, auto){recv.on_read(cmd);});
db.set_write_handler([this](auto){recv.on_write();});
db.set_resp3_handler([this](auto a, auto b, auto c){recv.on_resp3(a, b, c);});
}
@@ -267,12 +271,51 @@ struct reconnect {
void test_reconnect()
{
net::io_context ioc;
reconnect rec{ioc.get_executor()};
reconnect<receiver5> rec{ioc.get_executor()};
rec.on_event({});
ioc.run();
}
// TODO: test_reconnect2() using on_read instead of on_write.
struct receiver6 {
public:
int counter = 0;
receiver6(client_type& db)
: db_{&db}
, adapter_{adapt(counter)}
{}
void on_write() {}
void on_read(command cmd)
{
if (cmd == command::hello) {
db_->send(command::get, "receiver6-key");
if (counter == 0)
db_->send(command::del, "receiver6-key");
db_->send(command::incr, "receiver6-key");
db_->send(command::quit);
return;
}
}
void on_resp3(command cmd, node<boost::string_view> const& nd, boost::system::error_code& ec)
{
if (cmd == command::incr)
adapter_(nd, ec);
}
private:
client_type* db_;
adapter_t<int> adapter_;
};
void test_reconnect2()
{
net::io_context ioc;
reconnect<receiver6> rec{ioc.get_executor()};
rec.on_event({});
ioc.run();
}
std::vector<node_type> gresp;
@@ -643,6 +686,7 @@ int main()
test_push();
test_push2();
test_reconnect();
test_reconnect2();
//net::io_context ioc {1};
//tcp::resolver resv(ioc);