mirror of
https://github.com/boostorg/redis.git
synced 2026-01-22 05:42:10 +00:00
Compare commits
1 Commits
develop
...
127-consum
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f506e1baee |
99
include/boost/redis/detail/buffer.hpp
Normal file
99
include/boost/redis/detail/buffer.hpp
Normal file
@@ -0,0 +1,99 @@
|
||||
#ifndef BOOST_REDIS_BUFFER_HPP
|
||||
#define BOOST_REDIS_BUFFER_HPP
|
||||
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <algorithm>
|
||||
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/throw_exception.hpp>
|
||||
|
||||
namespace boost::redis::detail
|
||||
{
|
||||
|
||||
template <class Elem, class Traits, class Allocator>
|
||||
class dynamic_string_buffer {
|
||||
public:
|
||||
using const_buffers_type = asio::const_buffer;
|
||||
using mutable_buffers_type = asio::mutable_buffer;
|
||||
|
||||
explicit
|
||||
dynamic_string_buffer(
|
||||
std::basic_string<Elem, Traits, Allocator>& s,
|
||||
std::size_t maximum_size) noexcept
|
||||
: string_(s)
|
||||
, max_size_(maximum_size)
|
||||
{
|
||||
}
|
||||
|
||||
void clear()
|
||||
{
|
||||
consumed_ = 0;
|
||||
}
|
||||
|
||||
std::size_t size() const noexcept
|
||||
{
|
||||
return (std::min)(string_.size() - consumed_, max_size_);
|
||||
}
|
||||
|
||||
bool empty() const noexcept
|
||||
{
|
||||
return size() == 0;
|
||||
}
|
||||
|
||||
std::size_t max_size() const noexcept
|
||||
{
|
||||
return max_size_;
|
||||
}
|
||||
|
||||
char const& front() const noexcept
|
||||
{
|
||||
return string_.at(consumed_);
|
||||
}
|
||||
|
||||
std::size_t capacity() const noexcept
|
||||
{
|
||||
return (std::min)(string_.capacity(), max_size());
|
||||
}
|
||||
|
||||
mutable_buffers_type data(std::size_t pos, std::size_t n) noexcept
|
||||
{
|
||||
return mutable_buffers_type(asio::buffer(asio::buffer(string_, max_size_) + pos + consumed_, n));
|
||||
}
|
||||
|
||||
const_buffers_type data(std::size_t pos, std::size_t n) const noexcept
|
||||
{
|
||||
return const_buffers_type(asio::buffer(asio::buffer(string_, max_size_) + pos + consumed_, n));
|
||||
}
|
||||
|
||||
void grow(std::size_t n)
|
||||
{
|
||||
if (string_.size() > max_size_ || (string_.size() + n) > max_size_)
|
||||
BOOST_THROW_EXCEPTION(std::length_error{"dynamic_string_buffer too long"});
|
||||
|
||||
string_.resize(string_.size() + n);
|
||||
}
|
||||
|
||||
void shrink(std::size_t n)
|
||||
{
|
||||
string_.resize(n > (string_.size() - consumed_) ? consumed_ : string_.size() - n);
|
||||
}
|
||||
|
||||
void consume(std::size_t n, std::size_t tolerance = 100'000)
|
||||
{
|
||||
consumed_ += n;
|
||||
if (consumed_ > tolerance) {
|
||||
string_.erase(0, consumed_);
|
||||
consumed_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::basic_string<Elem, Traits, Allocator>& string_;
|
||||
std::size_t consumed_ = 0;
|
||||
std::size_t const max_size_;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif // BOOST_REDIS_BUFFER_HPP
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <boost/redis/resp3/type.hpp>
|
||||
#include <boost/redis/config.hpp>
|
||||
#include <boost/redis/detail/runner.hpp>
|
||||
#include <boost/redis/detail/buffer.hpp>
|
||||
|
||||
#include <boost/system.hpp>
|
||||
#include <boost/asio/basic_stream_socket.hpp>
|
||||
@@ -111,7 +112,7 @@ public:
|
||||
// If we detect a push in the middle of a request we have
|
||||
// to hand it to the push consumer. To do that we need
|
||||
// some data in the read bufer.
|
||||
if (conn_->read_buffer_.empty()) {
|
||||
if (std::empty(conn_->dbuf_)) {
|
||||
|
||||
if (conn_->use_ssl()) {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
@@ -130,7 +131,7 @@ public:
|
||||
|
||||
// If the next request is a push we have to handle it to
|
||||
// the receive_op wait for it to be done and continue.
|
||||
if (resp3::to_type(conn_->read_buffer_.front()) == resp3::type::push) {
|
||||
if (resp3::to_type(conn_->dbuf_.front()) == resp3::type::push) {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn_->async_wait_receive(std::move(self));
|
||||
BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
|
||||
@@ -154,7 +155,7 @@ public:
|
||||
return;
|
||||
}
|
||||
|
||||
conn_->dbuf_.consume(n);
|
||||
conn_->consume(n);
|
||||
read_size_ += n;
|
||||
|
||||
BOOST_ASSERT(cmds_ != 0);
|
||||
@@ -204,7 +205,7 @@ struct receive_op {
|
||||
return;
|
||||
}
|
||||
|
||||
conn_->dbuf_.consume(n);
|
||||
conn_->consume(n);
|
||||
|
||||
if (!conn_->is_next_push()) {
|
||||
conn_->read_op_timer_.cancel();
|
||||
@@ -329,6 +330,7 @@ struct run_op {
|
||||
{
|
||||
conn->write_buffer_.clear();
|
||||
conn->read_buffer_.clear();
|
||||
conn->dbuf_.clear();
|
||||
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
asio::experimental::make_parallel_group(
|
||||
@@ -408,7 +410,7 @@ struct reader_op {
|
||||
bool as_push() const
|
||||
{
|
||||
return
|
||||
(resp3::to_type(conn->read_buffer_.front()) == resp3::type::push)
|
||||
(resp3::to_type(conn->dbuf_.front()) == resp3::type::push)
|
||||
|| conn->reqs_.empty()
|
||||
|| (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)
|
||||
|| !conn->is_waiting_response(); // Added to deal with MONITOR.
|
||||
@@ -457,7 +459,7 @@ struct reader_op {
|
||||
// - https://github.com/redis/redis/issues/11784
|
||||
// - https://github.com/redis/redis/issues/6426
|
||||
//
|
||||
BOOST_ASSERT(!conn->read_buffer_.empty());
|
||||
BOOST_ASSERT(!conn->dbuf_.empty());
|
||||
if (as_push()) {
|
||||
BOOST_ASIO_CORO_YIELD
|
||||
conn->async_wait_receive(std::move(self));
|
||||
@@ -890,12 +892,17 @@ private:
|
||||
|
||||
bool is_next_push() const noexcept
|
||||
{
|
||||
return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push);
|
||||
return !dbuf_.empty() && (resp3::to_type(dbuf_.front()) == resp3::type::push);
|
||||
}
|
||||
|
||||
auto is_open() const noexcept { return stream_->next_layer().is_open(); }
|
||||
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
|
||||
|
||||
void consume(std::size_t n)
|
||||
{
|
||||
dbuf_.consume(n, 1'000'000);
|
||||
}
|
||||
|
||||
asio::ssl::context ctx_;
|
||||
std::unique_ptr<next_layer_type> stream_;
|
||||
|
||||
@@ -907,8 +914,9 @@ private:
|
||||
timer_type read_op_timer_;
|
||||
runner_type runner_;
|
||||
|
||||
using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
|
||||
using dyn_buffer_type = redis::detail::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
|
||||
|
||||
std::size_t const read_buffer_consume_tolerance_ = 1'000'000; // 1Mb
|
||||
std::string read_buffer_;
|
||||
dyn_buffer_type dbuf_;
|
||||
std::string write_buffer_;
|
||||
|
||||
@@ -89,7 +89,7 @@ auto async_echo_stress() -> net::awaitable<void>
|
||||
int const sessions = 500;
|
||||
|
||||
// The number of pings that will be sent by each session.
|
||||
int const msgs = 1000;
|
||||
int const msgs = 1'000;
|
||||
|
||||
// The number of publishes that will be sent by each session with
|
||||
// each message.
|
||||
|
||||
Reference in New Issue
Block a user