2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-22 05:42:10 +00:00

Compare commits

...

1 Commits

Author SHA1 Message Date
Marcelo Zimbres
f506e1baee Adds a buffer that has a tolerance. 2023-08-07 21:29:31 +02:00
3 changed files with 116 additions and 9 deletions

View File

@@ -0,0 +1,99 @@
#ifndef BOOST_REDIS_BUFFER_HPP
#define BOOST_REDIS_BUFFER_HPP
#include <limits>
#include <string>
#include <algorithm>
#include <boost/asio/buffer.hpp>
#include <boost/throw_exception.hpp>
namespace boost::redis::detail
{
template <class Elem, class Traits, class Allocator>
class dynamic_string_buffer {
public:
using const_buffers_type = asio::const_buffer;
using mutable_buffers_type = asio::mutable_buffer;
explicit
dynamic_string_buffer(
std::basic_string<Elem, Traits, Allocator>& s,
std::size_t maximum_size) noexcept
: string_(s)
, max_size_(maximum_size)
{
}
void clear()
{
consumed_ = 0;
}
std::size_t size() const noexcept
{
return (std::min)(string_.size() - consumed_, max_size_);
}
bool empty() const noexcept
{
return size() == 0;
}
std::size_t max_size() const noexcept
{
return max_size_;
}
char const& front() const noexcept
{
return string_.at(consumed_);
}
std::size_t capacity() const noexcept
{
return (std::min)(string_.capacity(), max_size());
}
mutable_buffers_type data(std::size_t pos, std::size_t n) noexcept
{
return mutable_buffers_type(asio::buffer(asio::buffer(string_, max_size_) + pos + consumed_, n));
}
const_buffers_type data(std::size_t pos, std::size_t n) const noexcept
{
return const_buffers_type(asio::buffer(asio::buffer(string_, max_size_) + pos + consumed_, n));
}
void grow(std::size_t n)
{
if (string_.size() > max_size_ || (string_.size() + n) > max_size_)
BOOST_THROW_EXCEPTION(std::length_error{"dynamic_string_buffer too long"});
string_.resize(string_.size() + n);
}
void shrink(std::size_t n)
{
string_.resize(n > (string_.size() - consumed_) ? consumed_ : string_.size() - n);
}
void consume(std::size_t n, std::size_t tolerance = 100'000)
{
consumed_ += n;
if (consumed_ > tolerance) {
string_.erase(0, consumed_);
consumed_ = 0;
}
}
private:
std::basic_string<Elem, Traits, Allocator>& string_;
std::size_t consumed_ = 0;
std::size_t const max_size_;
};
}
#endif // BOOST_REDIS_BUFFER_HPP

View File

@@ -16,6 +16,7 @@
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/detail/runner.hpp>
#include <boost/redis/detail/buffer.hpp>
#include <boost/system.hpp>
#include <boost/asio/basic_stream_socket.hpp>
@@ -111,7 +112,7 @@ public:
// If we detect a push in the middle of a request we have
// to hand it to the push consumer. To do that we need
// some data in the read bufer.
if (conn_->read_buffer_.empty()) {
if (std::empty(conn_->dbuf_)) {
if (conn_->use_ssl()) {
BOOST_ASIO_CORO_YIELD
@@ -130,7 +131,7 @@ public:
// If the next request is a push we have to handle it to
// the receive_op wait for it to be done and continue.
if (resp3::to_type(conn_->read_buffer_.front()) == resp3::type::push) {
if (resp3::to_type(conn_->dbuf_.front()) == resp3::type::push) {
BOOST_ASIO_CORO_YIELD
conn_->async_wait_receive(std::move(self));
BOOST_REDIS_CHECK_OP1(conn_->cancel(operation::run););
@@ -154,7 +155,7 @@ public:
return;
}
conn_->dbuf_.consume(n);
conn_->consume(n);
read_size_ += n;
BOOST_ASSERT(cmds_ != 0);
@@ -204,7 +205,7 @@ struct receive_op {
return;
}
conn_->dbuf_.consume(n);
conn_->consume(n);
if (!conn_->is_next_push()) {
conn_->read_op_timer_.cancel();
@@ -329,6 +330,7 @@ struct run_op {
{
conn->write_buffer_.clear();
conn->read_buffer_.clear();
conn->dbuf_.clear();
BOOST_ASIO_CORO_YIELD
asio::experimental::make_parallel_group(
@@ -408,7 +410,7 @@ struct reader_op {
bool as_push() const
{
return
(resp3::to_type(conn->read_buffer_.front()) == resp3::type::push)
(resp3::to_type(conn->dbuf_.front()) == resp3::type::push)
|| conn->reqs_.empty()
|| (!conn->reqs_.empty() && conn->reqs_.front()->get_number_of_commands() == 0)
|| !conn->is_waiting_response(); // Added to deal with MONITOR.
@@ -457,7 +459,7 @@ struct reader_op {
// - https://github.com/redis/redis/issues/11784
// - https://github.com/redis/redis/issues/6426
//
BOOST_ASSERT(!conn->read_buffer_.empty());
BOOST_ASSERT(!conn->dbuf_.empty());
if (as_push()) {
BOOST_ASIO_CORO_YIELD
conn->async_wait_receive(std::move(self));
@@ -890,12 +892,17 @@ private:
bool is_next_push() const noexcept
{
return !read_buffer_.empty() && (resp3::to_type(read_buffer_.front()) == resp3::type::push);
return !dbuf_.empty() && (resp3::to_type(dbuf_.front()) == resp3::type::push);
}
auto is_open() const noexcept { return stream_->next_layer().is_open(); }
auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
void consume(std::size_t n)
{
dbuf_.consume(n, 1'000'000);
}
asio::ssl::context ctx_;
std::unique_ptr<next_layer_type> stream_;
@@ -907,8 +914,9 @@ private:
timer_type read_op_timer_;
runner_type runner_;
using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
using dyn_buffer_type = redis::detail::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
std::size_t const read_buffer_consume_tolerance_ = 1'000'000; // 1Mb
std::string read_buffer_;
dyn_buffer_type dbuf_;
std::string write_buffer_;

View File

@@ -89,7 +89,7 @@ auto async_echo_stress() -> net::awaitable<void>
int const sessions = 500;
// The number of pings that will be sent by each session.
int const msgs = 1000;
int const msgs = 1'000;
// The number of publishes that will be sent by each session with
// each message.