2
0
mirror of https://github.com/boostorg/redis.git synced 2026-01-29 07:42:37 +00:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Marcelo Zimbres
394bdf5b5e Increases the version. 2022-05-01 12:41:47 +02:00
Marcelo Zimbres
a4745a1a5d Better docs. 2022-05-01 12:34:21 +02:00
Marcelo Zimbres
b99da962d1 More improvements in the docs. 2022-05-01 10:48:03 +02:00
Marcelo Zimbres
172de6235c Code simplification. 2022-05-01 08:31:00 +02:00
Marcelo Zimbres
5e99a58685 Fixes the echo_server example. 2022-04-30 22:23:31 +02:00
Marcelo Zimbres
b952a2d2d8 Fixes async_op. 2022-04-30 22:21:36 +02:00
Marcelo Zimbres
16d1f8df24 Commit of
- Documentation.
- Avoids temporaries on connect.
- Removes many unnecessary instantiations of the serializer.
- Fixes ping operation.
2022-04-30 13:09:41 +02:00
15 changed files with 311 additions and 233 deletions

View File

@@ -1 +1 @@
See file:///tmp/aedis/html/installation.html
See https://mzimbres.github.io/aedis/#using-aedis

View File

@@ -32,7 +32,7 @@
@li Support for Redis [sentinel](https://redis.io/docs/manual/sentinel).
@li Sync and async API.
In addition to that, Aedis provides a high level client that offers the following functionality
In addition to that, Aedis provides a high-level client that offers the following functionality
@li Management of message queues.
@li Simplified handling of server pushes.
@@ -54,7 +54,7 @@
@li Get and return its old value.
@li Quit
The coroutine-based async implementation of the steps above look like
The coroutine-based asynchronous implementation of the steps above look like
@code
net::awaitable<std::string> set(net::ip::tcp::endpoint ep)
@@ -95,9 +95,9 @@
\subsection requests Requests
As stated above, request are created by defining a storage object
As stated above, requests are created by defining a storage object
and a serializer that knows how to convert user data into valid
RESP3 wire-format. Redis request are composed of one or more
RESP3 wire-format. They are composed of one or more
commands (in Redis documentation they are called [pipelines](https://redis.io/topics/pipelining)),
which means users can add
as many commands to the request as they like, a feature that aids
@@ -192,21 +192,21 @@
---------|-------------------------------------|--------------
lpush | Number | https://redis.io/commands/lpush
lrange | Array | https://redis.io/commands/lrange
set | Simple string, null or blob string | https://redis.io/commands/set
get | Blob string | https://redis.io/commands/get
set | Simple-string, null or blob-string | https://redis.io/commands/set
get | Blob-string | https://redis.io/commands/get
smembers | Set | https://redis.io/commands/smembers
hgetall | Map | https://redis.io/commands/hgetall
Once the RESP3 type of a given response is known we can choose a
proper C++ data structure to receive it in. Fortunately, this is a
simple task for most types. The table below summarise the options
simple task for most types. The table below summarises the options
RESP3 type | C++ | Type
---------------|--------------------------------------------------------------|------------------
Simple string | \c std::string | Simple
Simple error | \c std::string | Simple
Blob string | \c std::string, \c std::vector | Simple
Blob error | \c std::string, \c std::vector | Simple
Simple-string | \c std::string | Simple
Simple-error | \c std::string | Simple
Blob-string | \c std::string, \c std::vector | Simple
Blob-error | \c std::string, \c std::vector | Simple
Number | `long long`, `int`, `std::size_t`, \c std::string | Simple
Double | `double`, \c std::string | Simple
Null | `boost::optional<T>` | Simple
@@ -216,7 +216,7 @@
Push | \c std::vector, \c std::map, \c std::unordered_map | Aggregate
Responses that contain nested aggregates or heterogeneous data
types will be given special treatment laster. As of this writing,
types will be given special treatment later. As of this writing,
not all RESP3 types are used by the Redis server, which means in
practice users will be concerned with a reduced subset of the
RESP3 specification. Now let us see some examples
@@ -252,10 +252,10 @@
co_await resp3::async_read(socket, dbuffer, adapt(vec));
@endcode
In other words, it is pretty straightforward, just pass the result
of \c adapt to the read function and make sure the response data
type fits in the type you are calling @c adapter(...) with. All
standard C++ containers are supported by aedis.
In other words, it is straightforward, just pass the result of \c
adapt to the read function and make sure the response data type is
compatible with the data structure you are calling @c adapter(...)
with. All standard C++ containers are supported by Aedis.
\subsubsection Optional
@@ -269,9 +269,8 @@
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(umap));
@endcode
Everything else stays pretty much the same, before accessing data,
users will have to check or assert the optional contains a
value.
Everything else stays the same, before accessing data, users will
have to check or assert the optional contains a value.
\subsubsection heterogeneous_aggregates Heterogeneous aggregates
@@ -279,7 +278,7 @@
contain heterogeneous data, for example, an array that contains
integers, strings nested sets etc. Aedis supports reading such
aggregates in a \c std::tuple efficiently as long as the they
don't contain 2-order nested aggregates e.g. an array that
don't contain 3-order nested aggregates e.g. an array that
contains an array of arrays. For example, to read the response to
a \c hello command we can use the following response type.
@@ -321,11 +320,10 @@
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(trans));
@endcode
Note that we are not ignoring the response to the commands
themselves above but whether they have been successfully queued.
Only after @c exec is received Redis will execute them in
sequence. The response will then be sent in a single chunk to the
client.
Note that above we are not ignoring the response to the commands
themselves but whether they have been successfully queued. Only
after @c exec is received Redis will execute them in sequence and
send all responses together in an array.
\subsubsection Serialization
@@ -334,7 +332,7 @@
strings, for example
@code
sr.push(command::set, "key", "{"Server": "Redis"}"); // Unquoted string
sr.push(command::set, "key", "{"Server": "Redis"}"); // Unquoted for readability.
sr.push(command::get, "key")
@endcode
@@ -365,14 +363,13 @@
\subsubsection gen-case The general case
As already mentioned, there are cases where the response to Redis
As already mentioned, there are cases where responses to Redis
commands won't fit in the model presented above, some examples are
@li Commands (like \c set) whose response don't have a fixed
RESP3 type. Expecting an \c int and receiving a blob string
RESP3 type. Expecting an \c int and receiving a blob-string
will result in error.
@li RESP3 responses that contain three levels of (nested) aggregates can't be
read in STL containers.
@li RESP3 aggregates that contain nested aggregates can't be read in STL containers.
@li Transactions with a dynamic number of commands can't be read in a \c std::tuple.
To deal with these cases Aedis provides the \c resp3::node
@@ -443,30 +440,32 @@
As stated earlier, the low-level API is very useful for tasks that
can be performed with short lived connections. Sometimes however,
the need for long-lived connections becomes apparent and compeling
the need for long-lived connections becomes compeling
@li \b Server \b pushes: Short lived connections can't handle server pushes (e.g. https://redis.io/topics/client-side-caching and https://redis.io/topics/notifications).
@li \b Pubsub: Just like server pushes, to use Redis pubsub users need long lasting connections (https://redis.io/topics/pubsub).
@li \b Performance: Keep opening and closing connections impact performance.
@li \b Server \b pushes: Short lived connections can't deal with server pushes, that means no [client side caching](https://redis.io/topics/client-side-caching), [notifications](https://redis.io/topics/notifications) and [pubsub](https://redis.io/topics/pubsub).
@li \b Performance: Keep opening and closing connections impact performance serverely.
@li \b Pipeline: Code such as shown in \ref low-level-api don't support pipelines well since it can only send a fixed number of commands at time. It misses important optimization opportunities (https://redis.io/topics/pipelining).
A serious implementation that supports the points listed above is
far from trivial as it involves the following async operations
far from trivial and involves many complex asynchronous operations
@li \c async_resolve: Resolve a hostname.
@li \c async_connect: Connect to Redis.
@li \c async_read: Performed in a loop as long as the connection lives.
@li \c async_write: Performed everytime a new message is added.
@li \c async_wait: To timout all operations above if the server becomes unresponsive.
In addition to that
Notice that many of the operations above will run concurrently with each other and, in addition to that
@li Each operation listed above requires timeout support.
@li \c async_write operations require management of the message queue to prevent concurrent writes.
@li Healthy checks must be sent periodically by the client to detect a dead or unresponsive server.
@li Recovery after a disconnection to avoid loosing enqueued commands.
To avoid imposing this burden on every user, Aedis provides its
own implementation. The general form of a program that uses the
high-level api looks like this
Expecting users to implement these points themselves is
unrealistic and could result in code that performs poorly and
can't handle errors properly. To avoid all of that, Aedis
provides its own implementation. The general form of a program
that uses the high-level API looks like this
@code
int main()
@@ -477,7 +476,7 @@
auto recv = std::make_shared<receiver>(db);
db.set_receiver(recv);
db.async_run("127.0.0.1", "6379", [](auto ec){ std::cout << ec.message() << std::endl;});
db.async_run("127.0.0.1", "6379", [](auto ec){ ... });
ioc.run();
}
@@ -487,18 +486,49 @@
receiver. For example
@code
// Callbacks.
struct receiver {
void on_resp3(command cmd, node<boost::string_view> const& nd, boost::system::error_code& ec) { ... }
void on_resp3(command cmd, node<string_view> const& nd, error_code& ec) { ... }
void on_read(command cmd, std::size_t) { ... }
void on_write(std::size_t n) { ... }
void on_push(std::size_t n) { }
void on_write(std::size_t n) { ... }
};
@endcode
The functions in the receiver are callbacks that are called by the client class.
The functions in the receiver above are callbacks that will be
called when events arrives
@li \c on_resp3: Called when a new chunk of resp3 data is parsed.
@li \c on_read: Called after the response to a command has been successfully read.
@li \c on_push: Called when a server push is received.
@li \c on_write: Called after a request has been successfully written to the stream.
The callbacks above are never called on errors, instead the \c
async_run function returns. Reconnection is also supported, for
example
@code
net::awaitable<void> run(std::shared_ptr<client_type> db)
{
auto ex = co_await net::this_coro::executor;
boost::asio::steady_timer timer{ex};
for (error_code ec;;) {
co_await db->async_run("127.0.0.1", "6379", redirect_error(use_awaitable, ec));
// Log the error.
std::clog << ec.message() << std::endl;
// Wait two seconds and try again.
timer.expires_after(std::chrono::seconds{2});
co_await timer.async_wait(redirect_error(use_awaitable, ec));
}
}
@endcode
when reconnecting the client will recover requests that haven't
been sent to Redis yet.
\subsection high-level-sending-cmds Sending commands
@@ -518,10 +548,10 @@
@endcode
The \c send functions in this case will add commands to the output
queue and send them only if there is no pending response of a
previously sent command. This is so because RESP3 is a
request/response protocol, which means clients must wait for the
response to a command before proceeding with the next one.
queue and send them only if there is no pending response. This is
so because RESP3 is a request/response protocol, which means
clients must wait for responses before sending
the next request.
\section examples Examples
@@ -538,16 +568,16 @@
@li transaction.cpp: Shows how to read the response to transactions.
@li custom_adapter.cpp: Shows how to write a response adapter that prints to the screen, see \ref low-level-adapters.
\b High \b level \b API
\b High \b level \b API (async only)
@li intro_high_level.cpp: High level API usage example.
@li intro_high_level.cpp: High-level API usage example.
@li aggregates_high_level.cpp: Shows how receive RESP3 aggregate data types in a general way or in STL containers.
@li subscriber_high_level.cpp: Shows how channel [subscription](https://redis.io/topics/pubsub) works at a high level.
@li subscriber_high_level.cpp: Shows how channel [subscription](https://redis.io/topics/pubsub) works at a high-level.
\b Asynchronous \b Servers (high level API)
\b Asynchronous \b Servers (high-level API)
@li echo_server.cpp: Shows the basic principles behind asynchronous communication with a database in an asynchronous server.
@li chat_room.cpp: Shows how to build a scalable chat room that scales to millions of users.
@li chat_room.cpp: Shows how to build a scalable chat room.
\section using-aedis Using Aedis
@@ -581,7 +611,7 @@
```
If you can't use \c configure and \c make (e.g. Windows users)
you can already add the directory where you unpacked aedis to the
you can already add the directory where you unpacked Aedis to the
include directories in your project, otherwise run
```
@@ -650,7 +680,7 @@
Redis features like client side caching, among other things.
@li The Asio asynchronous model.
@li Serialization of user data types that avoids temporaries.
@li Error handling with error-code and exception overload.
@li Error handling with error-code and exception overloads.
@li Healthy checks.
@li Fine control over memory allocation by means of allocators.
@@ -798,9 +828,12 @@
\section Acknowledgement
I would like to thank Vinícius dos Santos Oliveira for useful discussion about how Aedis consumes buffers in the read operation (among other things).
Some people that were helpful in the development of Aedis
\section Referece
@li Richard Hodges ([madmongo1](https://github.com/madmongo1)): For answering pretty much every question I had about Asio and the design of asynchronous programs.
@li Vinícius dos Santos Oliveira ([vinipsmaker](https://github.com/vinipsmaker)): For useful discussion about how Aedis consumes buffers in the read operation (among other things).
\section Reference
See \subpage any.

View File

@@ -17,7 +17,6 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/container/static_vector.hpp>
#include <aedis/resp3/type.hpp>
#include <aedis/resp3/node.hpp>
@@ -30,53 +29,49 @@ namespace generic {
/** \brief A high level Redis client.
* \ingroup any
*
* This class represents a connection to the Redis server. Some of
* its most important features are
*
* 1. Automatic management of commands. The implementation will send
* commands and read responses automatically for the user.
* 2. Memory reuse. Dynamic memory allocations will decrease with time.
*
* For more details, please see the documentation of each individual
* function.
* This class keeps a connection open to the Redis server where
* commands can be sent at any time. For more details, please see the
* documentation of each individual function.
*/
template <class AsyncReadWriteStream, class Command>
class client {
public:
/// Executor used.
/// Executor type.
using executor_type = typename AsyncReadWriteStream::executor_type;
/// Type of the callback called when a message is received.
/// Callback type of read operations.
using read_handler_type = std::function<void(Command cmd, std::size_t)>;
/// Type of the callback called when a message is written.
/// Callback type of write operations.
using write_handler_type = std::function<void(std::size_t)>;
/// Type of the callback called when a push message is received.
/// Callback type of push operations.
using push_handler_type = std::function<void(std::size_t)>;
/// Type of the callback called when resp3 chuncks are received.
/// Callback type of resp3 operations.
using resp3_handler_type = std::function<void(Command, resp3::node<boost::string_view> const&, boost::system::error_code&)>;
using default_completion_token_type = boost::asio::default_completion_token_t<executor_type>;
/** @brief Configuration parameters.
*/
struct config {
/// Timeout of the async_resolve operation.
/// Timeout of the \c async_resolve operation.
std::chrono::seconds resolve_timeout = std::chrono::seconds{5};
/// Timeout of the async_connect operation.
/// Timeout of the \c async_connect operation.
std::chrono::seconds connect_timeout = std::chrono::seconds{5};
/// Timeout of the async_read operation.
/// Timeout of the \c async_read operation.
std::chrono::seconds read_timeout = std::chrono::seconds{5};
/// Timeout of the async_write operation.
/// Timeout of the \c async_write operation.
std::chrono::seconds write_timeout = std::chrono::seconds{5};
/// Time after which a connection is considered idle if no data is received.
std::chrono::seconds idle_timeout = std::chrono::seconds{10};
/// The maximum size of a read.
/// The maximum size allwed in a read operation.
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
};
@@ -96,6 +91,7 @@ public:
, on_write_{[](std::size_t){}}
, on_push_{[](std::size_t){}}
, on_resp3_{[](Command, resp3::node<boost::string_view> const&, boost::system::error_code&) {}}
, sr_{requests_}
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
, type_{resp3::type::invalid}
, cmd_info_{std::make_pair<Command>(Command::invalid, 0)}
@@ -110,7 +106,7 @@ public:
/** @brief Adds a command to the output command queue.
*
* Adds a command to the end of the next request and signals the
* writer operation there is new message awaiting to be sent.
* writer operation there is a new message awaiting to be sent.
* Otherwise the function is equivalent to serializer::push. @sa
* serializer.
*/
@@ -119,12 +115,10 @@ public:
{
auto const can_write = prepare_next();
serializer<std::string> sr(requests_);
auto const before = requests_.size();
sr.push(cmd, args...);
auto const after = requests_.size();
BOOST_ASSERT(after - before != 0);
auto const d = after - before;
sr_.push(cmd, args...);
auto const d = requests_.size() - before;
BOOST_ASSERT(d != 0);
info_.back().size += d;;
if (!has_push_response(cmd)) {
@@ -139,7 +133,7 @@ public:
/** @brief Adds a command to the output command queue.
*
* Adds a command to the end of the next request and signals the
* writer operation there is new message awaiting to be sent.
* writer operation there is a new message awaiting to be sent.
* Otherwise the function is equivalent to
* serializer::push_range2.
* @sa serializer.
@@ -152,12 +146,10 @@ public:
auto const can_write = prepare_next();
serializer<std::string> sr(requests_);
auto const before = requests_.size();
sr.push_range2(cmd, key, begin, end);
auto const after = requests_.size();
BOOST_ASSERT(after - before != 0);
auto const d = after - before;
sr_.push_range2(cmd, key, begin, end);
auto const d = requests_.size() - before;
BOOST_ASSERT(d != 0);
info_.back().size += d;
if (!has_push_response(cmd)) {
@@ -172,7 +164,7 @@ public:
/** @brief Adds a command to the output command queue.
*
* Adds a command to the end of the next request and signals the
* writer operation there is new message awaiting to be sent.
* writer operation there is a new message awaiting to be sent.
* Otherwise the function is equivalent to
* serializer::push_range2.
* @sa serializer.
@@ -185,12 +177,10 @@ public:
auto const can_write = prepare_next();
serializer<std::string> sr(requests_);
auto const before = requests_.size();
sr.push_range2(cmd, begin, end);
auto const after = requests_.size();
BOOST_ASSERT(after - before != 0);
auto const d = after - before;
sr_.push_range2(cmd, begin, end);
auto const d = requests_.size() - before;
BOOST_ASSERT(d != 0);
info_.back().size += d;
if (!has_push_response(cmd)) {
@@ -205,7 +195,7 @@ public:
/** @brief Adds a command to the output command queue.
*
* Adds a command to the end of the next request and signals the
* writer operation there is new message awaiting to be sent.
* writer operation there is a new message awaiting to be sent.
* Otherwise the function is equivalent to
* serializer::push_range.
* @sa serializer.
@@ -221,7 +211,7 @@ public:
/** @brief Adds a command to the output command queue.
*
* Adds a command to the end of the next request and signals the
* writer operation there is new message awaiting to be sent.
* writer operation there is a new message awaiting to be sent.
* Otherwise the function is equivalent to
* serializer::push_range.
* @sa serializer.
@@ -236,35 +226,58 @@ public:
/** @brief Starts communication with the Redis server asynchronously.
*
* This class performs the following steps
* This function performs the following steps
*
* @li Resolve the Redis host as of \c async_resolve with the
* @li Resolves the Redis host as of \c async_resolve with the
* timeout passed in client::config::resolve_timeout.
*
* @li Connect to one of the endpoints returned by the resolve
* @li Connects to one of the endpoints returned by the resolve
* operation with the timeout passed in client::config::connect_timeout.
*
* @li Start the async read operation that keeps reading
* incoming responses. Each individual read uses the timeout
* passed on client::config::read_timeout. After each successful read
* it will call the read or push callback.
* @li Starts the \c async_read operation that keeps reading incoming
* responses. Each individual read uses the timeout passed on
* client::config::read_timeout. After each successful read it
* will call the read or push callback.
*
* @li Start the async write operation that wait for new commands
* @li Starts the \c async_write operation that waits for new commands
* to be sent to Redis. Each individual write uses the timeout
* passed on client::config::write_timeout. After a successful write it
* will call the write callback.
* passed on client::config::write_timeout. After a successful
* write it will call the write callback.
*
* @li Start the check idle operation with the timeout specified
* in client::config::idle_timeout. If no data is received during that
* time interval async_run returns generic::error::idle_timeout.
* @li Starts the check idle operation with the timeout specified
* in client::config::idle_timeout. If no data is received during
* that time interval \c async_run completes with
* generic::error::idle_timeout.
*
* @li Start the healthy check operation that sends
* @li Starts the healthy check operation that sends
* redis::command::ping to Redis with a frequency equal to
* client::config::idle_timeout / 2.
*
* In addition to the callbacks mentioned above, the read
* operations will call the resp3 callback as soon a new chunks
* of data become available to the user.
* operations will call the resp3 callback as soon a new chunks of
* data become available to the user.
*
* It is safe to call \c async_run after it has returned. In this
* case, any outstanding commands will be sent after the
* connection is restablished. If a disconnect occurs while the
* response to a request has not been received, the client doesn't
* try to resend it to avoid resubmission.
*
* Example:
*
* @code
* awaitable<void> run_with_reconnect(std::shared_ptr<client_type> db)
* {
* auto ex = co_await this_coro::executor;
* asio::steady_timer timer{ex};
*
* for (error_code ec;;) {
* co_await db->async_run("127.0.0.1", "6379", redirect_error(use_awaitable, ec));
* timer.expires_after(std::chrono::seconds{2});
* co_await timer.async_wait(redirect_error(use_awaitable, ec));
* }
* }
* @endcode
*
* \param host Ip address or name of the Redis server.
* \param port Port where the Redis server is listening.
@@ -276,17 +289,7 @@ public:
* void f(boost::system::error_code);
* @endcode
*
* Notice this function returns only when there is an error.
*
* @remark
*
* @li It is safe to call this function again after it has
* returned. In that case, any outstanding commands will be sent
* and not get lost.
*
* @li If a disconnect occurs while the response to a request has
* not been received, the client doesn't try to resend it to avoid
* resubmission problem.
* \return This function returns only when there is an error.
*/
template <class CompletionToken = default_completion_token_type>
auto
@@ -347,8 +350,9 @@ private:
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
template <class T, class V> friend struct detail::reader_op;
template <class T, class V> friend struct detail::ping_op;
template <class T, class V> friend struct detail::ping_after_op;
template <class T> friend struct detail::read_op;
template <class T> friend struct detail::read_until_op;
template <class T> friend struct detail::writer_op;
template <class T> friend struct detail::write_op;
template <class T> friend struct detail::run_op;
@@ -357,11 +361,11 @@ private:
template <class T> friend struct detail::check_idle_op;
template <class T> friend struct detail::init_op;
template <class T> friend struct detail::read_write_check_op;
template <class T> friend struct detail::wait_data_op;
template <class T> friend struct detail::wait_for_data_op;
void on_resolve()
{
// If we are comming from a connection that was lost we have to
// If we are coming from a connection that was lost we have to
// reset the socket to a fresh state.
socket_ =
std::make_shared<AsyncReadWriteStream>(read_timer_.get_executor());
@@ -377,6 +381,7 @@ private:
// no commands that were left unresponded from the last
// connection. We can send hello as usual.
BOOST_ASSERT(requests_.empty());
BOOST_ASSERT(commands_.empty());
send(Command::hello, 3);
return;
}
@@ -393,6 +398,7 @@ private:
// the risc of resubmission).
requests_.erase(0, info_.front().size);
// Erases the commands that were lost as well.
commands_.erase(
std::begin(commands_),
std::begin(commands_) + info_.front().cmds);
@@ -403,17 +409,25 @@ private:
// info_.erase(std::begin(info_));
}
std::string tmp;
serializer<std::string> sr(tmp);
sr.push(Command::hello, 3);
auto const hello_size = tmp.size();
std::copy(std::cbegin(requests_), std::cend(requests_), std::back_inserter(tmp));
requests_ = std::move(tmp);
// Code below will add a hello to the front of the request and
// update info_ and commands_ accordingly.
info_.front().size = hello_size + info_.front().size;
++info_.front().cmds;
auto const old_size = requests_.size();
sr_.push(Command::hello, 3);
auto const hello_size = requests_.size() - old_size;;
// Push front.
// Now we have to rotate the hello to the front of the request
// (Remember it must always be the first command).
std::rotate(
std::begin(requests_),
std::begin(requests_) + old_size,
std::end(requests_));
// Updates info_.
info_.front().size += hello_size;
info_.front().cmds += 1;
// Updates commands_
commands_.push_back(std::make_pair(Command::hello, hello_size));
std::rotate(
std::begin(commands_),
@@ -421,11 +435,9 @@ private:
std::end(commands_));
}
/* Prepares the back of the queue to receive further commands.
*
* If true is returned the request in the front of the queue can be
* sent to the server. See async_write_some.
*/
// Prepares the back of the queue to receive further commands. If
// true is returned the request in the front of the queue can be
// sent to the server.
bool prepare_next()
{
if (info_.empty()) {
@@ -448,7 +460,7 @@ private:
return info_.front().cmds == 0;
}
// Returns true when the next request can be writen.
// Returns true when the next request can be written.
bool on_cmd(command_info_type)
{
BOOST_ASSERT(!info_.empty());
@@ -477,7 +489,7 @@ private:
}
// Connects the socket to one of the endpoints in endpoints_ and
// stores the successful enpoint in endpoint_.
// stores the successful endpoint in endpoint_.
template <class CompletionToken = default_completion_token_type>
auto
async_connect(CompletionToken&& token = default_completion_token_type{})
@@ -488,6 +500,16 @@ private:
>(detail::connect_op<client>{this}, token, write_timer_.get_executor());
}
template <class CompletionToken = default_completion_token_type>
auto
async_read_until(CompletionToken&& token = default_completion_token_type{})
{
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::read_until_op<client>{this}, token, read_timer_.get_executor());
}
// Reads a complete resp3 response from the socket using the
// timeout config::read_timeout. On a successful read calls
// on_read_ or on_push_ depending on whether the response is a push
@@ -562,7 +584,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::ping_op<client, Command>{this}, token, read_timer_);
>(detail::ping_after_op<client, Command>{this}, token, read_timer_);
}
template <class CompletionToken = default_completion_token_type>
@@ -572,7 +594,7 @@ private:
return boost::asio::async_compose
< CompletionToken
, void(boost::system::error_code)
>(detail::wait_data_op<client>{this}, token, read_timer_);
>(detail::wait_for_data_op<client>{this}, token, read_timer_);
}
template <class CompletionToken = default_completion_token_type>
@@ -636,21 +658,22 @@ private:
// Called when a complete push message is received.
push_handler_type on_push_;
// Called by the parser after each new chunck of resp3 data is
// Called by the parser after each new chunk of resp3 data is
// processed.
resp3_handler_type on_resp3_;
// Buffer used by the read operations.
std::string read_buffer_;
// Requests payload.
// Requests payload and its serializer.
std::string requests_;
serializer<std::string> sr_;
// The commands contained in the requests.
std::vector<command_info_type> commands_;
// Info about the requests.
boost::container::static_vector<info, 2> info_;
std::vector<info> info_;
// Last time we received data.
time_point_type last_data_;
@@ -667,10 +690,8 @@ private:
// See async_resolve.
boost::asio::ip::tcp::resolver::results_type endpoints_;
// Host passed to async_run.
// Host and port passed to async_run.
boost::string_view host_;
// Port passed to async_run.
boost::string_view port_;
};

View File

@@ -28,7 +28,7 @@ namespace detail {
#include <boost/asio/yield.hpp>
template <class Client, class Command>
struct ping_op {
struct ping_after_op {
Client* cli;
boost::asio::coroutine coro;
@@ -36,13 +36,13 @@ struct ping_op {
void
operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro) {
reenter (coro)
{
BOOST_ASSERT((cli->cfg_.idle_timeout / 2) != std::chrono::seconds{0});
cli->read_timer_.expires_after(cli->cfg_.idle_timeout / 2);
yield cli->read_timer_.async_wait(std::move(self));
if (ec) {
// operation_aborted: ok, not an error.
self.complete({});
self.complete(ec);
return;
}
@@ -54,7 +54,7 @@ struct ping_op {
};
template <class Client>
struct wait_data_op {
struct read_until_op {
Client* cli;
boost::asio::coroutine coro;
@@ -63,21 +63,46 @@ struct wait_data_op {
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro) {
// Detached.
cli->async_ping_after([](boost::system::error_code ec){});
reenter (coro)
{
// Waits for incomming data.
yield boost::asio::async_read_until(*cli->socket_, boost::asio::dynamic_buffer(cli->read_buffer_, cli->cfg_.max_read_size), "\r\n", std::move(self));
yield
boost::asio::async_read_until(
*cli->socket_,
boost::asio::dynamic_buffer(cli->read_buffer_, cli->cfg_.max_read_size),
"\r\n",
std::move(self));
// Cancels the async_ping_after.
cli->read_timer_.cancel();
if (ec) {
self.complete(ec);
return;
}
self.complete(ec);
}
}
};
self.complete({});
template <class Client>
struct wait_for_data_op {
Client* cli;
boost::asio::coroutine coro;
template <class Self>
void operator()( Self& self
, std::array<std::size_t, 2> order = {}
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {})
{
reenter (coro)
{
yield
boost::asio::experimental::make_parallel_group(
[this](auto token) { return cli->async_read_until(token);},
[this](auto token) { return cli->async_ping_after(token);}
).async_wait(
boost::asio::experimental::wait_for_all(),
std::move(self));
// The order of completion is not important.
self.complete(ec1);
}
}
};
@@ -90,8 +115,8 @@ struct check_idle_op {
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro) for(;;) {
reenter (coro) for(;;)
{
cli->check_idle_timer_.expires_after(cli->cfg_.idle_timeout);
yield cli->check_idle_timer_.async_wait(std::move(self));
if (ec) {
@@ -122,7 +147,8 @@ struct resolve_op {
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::resolver::results_type res = {})
{
reenter (coro) {
reenter (coro)
{
yield
cli->resv_.async_resolve(cli->host_.data(), cli->port_.data(), std::move(self));
if (ec) {
@@ -147,7 +173,8 @@ struct connect_op {
, boost::system::error_code ec = {}
, boost::asio::ip::tcp::endpoint const& ep = {})
{
reenter (coro) {
reenter (coro)
{
yield
boost::asio::async_connect(
*cli->socket_,
@@ -176,8 +203,8 @@ struct init_op {
, boost::system::error_code ec1 = {}
, boost::system::error_code ec2 = {})
{
reenter (coro) {
reenter (coro)
{
// Tries to resolve with a timeout. We can use the writer
// timer here as there is no ongoing write operation.
cli->write_timer_.expires_after(cli->cfg_.resolve_timeout);
@@ -263,8 +290,8 @@ struct read_write_check_op {
, boost::system::error_code ec2 = {}
, boost::system::error_code ec3 = {})
{
reenter (coro) {
reenter (coro)
{
// Starts the reader and writer ops.
cli->wait_write_timer_.expires_at(std::chrono::steady_clock::time_point::max());
@@ -307,8 +334,8 @@ struct run_op {
template <class Self>
void operator()(Self& self, boost::system::error_code ec = {})
{
reenter (coro) {
reenter (coro)
{
yield cli->async_init(std::move(self));
if (ec) {
self.complete(ec);
@@ -338,8 +365,8 @@ struct write_op {
, std::size_t n = 0
, boost::system::error_code ec2 = {})
{
reenter (coro) {
reenter (coro)
{
BOOST_ASSERT(!cli->info_.empty());
BOOST_ASSERT(cli->info_.front().size != 0);
BOOST_ASSERT(!cli->requests_.empty());
@@ -398,7 +425,8 @@ struct writer_op {
template <class Self>
void operator()(Self& self , boost::system::error_code ec = {})
{
reenter (coro) for (;;) {
reenter (coro) for (;;)
{
yield cli->async_write(std::move(self));
if (ec) {
cli->socket_->close();
@@ -428,7 +456,8 @@ struct read_op {
, std::size_t n = 0
, boost::system::error_code ec2 = {})
{
reenter (coro) {
reenter (coro)
{
cli->read_timer_.expires_after(cli->cfg_.read_timeout);
yield
@@ -483,10 +512,10 @@ struct reader_op {
, boost::system::error_code ec = {}
, std::size_t n = 0)
{
reenter (coro) for (;;) {
boost::ignore_unused(n);
boost::ignore_unused(n);
reenter (coro) for (;;)
{
if (cli->read_buffer_.empty()) {
yield cli->async_wait_for_data(std::move(self));
if (ec) {
@@ -498,13 +527,14 @@ struct reader_op {
BOOST_ASSERT(!cli->read_buffer_.empty());
cli->type_ = resp3::to_type(cli->read_buffer_.front());
cli->cmd_info_ = std::make_pair<>(Command::invalid, 0);
cli->cmd_info_ = std::make_pair(Command::invalid, 0);
if (cli->type_ != resp3::type::push) {
BOOST_ASSERT(!cli->commands_.empty());
cli->cmd_info_ = cli->commands_.front();
}
cli->last_data_ = std::chrono::steady_clock::now();
yield cli->async_read(std::move(self));
if (ec) {
cli->on_reader_exit();

View File

@@ -12,7 +12,7 @@
namespace aedis {
namespace generic {
/** \brief Errors from the generic module.
/** \brief Generic errors.
* \ingroup any
*/
enum class error

View File

@@ -19,7 +19,7 @@
namespace aedis {
namespace generic {
/** @brief Creates a Redis request from user data.
/** @brief Creates Redis requests from user data.
* \ingroup any
*
* A request is composed of one or more redis commands and is

View File

@@ -448,9 +448,7 @@ char const* to_string(command c);
*/
std::ostream& operator<<(std::ostream& os, command c);
/** \brief Checks whether a command has push response.
* \ingroup any
*/
// Checks whether a command has push response.
bool has_push_response(command cmd);
} // redis

View File

@@ -12,7 +12,7 @@
namespace aedis {
namespace resp3 {
/** \brief RESP3 parsing errors.
/** \brief RESP3 errors.
* \ingroup any
*/
enum class error

View File

@@ -21,14 +21,6 @@ namespace resp3 {
* Redis responses are the pre-order view of the response tree (see
* https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR).
*
* The node class represent one element in the response tree. The
* string type is a template to give more flexibility. The library
* uses
*
* @li @c boost::string_view
* @li @c std::string
* @li @c boost::static_string
*
* \remark Any Redis response can be received in an array of nodes,
* for example \c std::vector<node<std::string>>.
*/

View File

@@ -34,12 +34,10 @@ namespace resp3 {
* For a complete example see examples/intro_sync.cpp. This function
* is implemented in terms of one or more calls to @c
* asio::read_until and @c asio::read functions, and is known as a @a
* composed @a operation.
*
* Furthermore, the implementation may read additional bytes from the
* stream that lie past the end of the message being read. These
* additional bytes are stored in the dynamic buffer, which must be
* preserved for subsequent reads.
* composed @a operation. Furthermore, the implementation may read
* additional bytes from the stream that lie past the end of the
* message being read. These additional bytes are stored in the
* dynamic buffer, which must be preserved for subsequent reads.
*
* \param stream The stream from which to read e.g. a tcp socket.
* \param buf Dynamic buffer (version 2).
@@ -144,12 +142,11 @@ read(
* For a complete example see examples/transaction.cpp. This function
* is implemented in terms of one or more calls to @c
* asio::async_read_until and @c asio::async_read functions, and is
* known as a @a composed @a operation.
*
* Furthermore, the implementation may read additional bytes from the
* stream that lie past the end of the message being read. These
* additional bytes are stored in the dynamic buffer, which must be
* preserved for subsequent reads.
* known as a @a composed @a operation. Furthermore, the
* implementation may read additional bytes from the stream that lie
* past the end of the message being read. These additional bytes are
* stored in the dynamic buffer, which must be preserved for
* subsequent reads.
*
* \param stream The stream from which to read e.g. a tcp socket.
* \param buffer Dynamic buffer (version 2).

View File

@@ -69,18 +69,12 @@ char const* to_string(type t);
*/
std::ostream& operator<<(std::ostream& os, type t);
/** \brief Checks whether the data type is an aggregate.
* \ingroup any
* \param t RESP3 data type.
/* Checks whether the data type is an aggregate.
*/
bool is_aggregate(type t);
/** @brief Checks the data type multilicity.
* \ingroup any
* \param t RESP3 type.
* \returns For map and attribute data types this function returns 2.
* All other types have value 1.
*/
// For map and attribute data types this function returns 2. All
// other types have value 1.
std::size_t element_multiplicity(type t);
// Returns the wire code of a given type.

View File

@@ -69,9 +69,7 @@ char const* to_string(command c);
*/
std::ostream& operator<<(std::ostream& os, command c);
/** \brief Checks whether a command has push response.
* \ingroup any
*/
// Checks whether a command has push response.
bool has_push_response(command cmd);
} // sentinel

View File

@@ -1,5 +1,5 @@
AC_PREREQ([2.69])
AC_INIT([Aedis], [0.1.1], [mzimbres@gmail.com])
AC_INIT([Aedis], [0.1.2], [mzimbres@gmail.com])
AC_CONFIG_MACRO_DIR([m4])
#AC_CONFIG_SRCDIR([src/aedis.cpp])
AC_CONFIG_HEADERS([config.h])

View File

@@ -24,7 +24,6 @@ using aedis::redis::command;
using aedis::generic::client;
using aedis::user_session;
using aedis::user_session_base;
using client_type = client<net::ip::tcp::socket, command>;
using response_type = std::vector<node<std::string>>;
@@ -40,8 +39,10 @@ public:
adapter_(nd, ec);
}
void on_read(command cmd, std::size_t)
void on_read(command cmd, std::size_t n)
{
std::cout << "on_read: " << cmd << " " << n << std::endl;
switch (cmd) {
case command::ping:
if (resp_.front().value != "PONG") {
@@ -77,6 +78,22 @@ private:
std::queue<std::shared_ptr<user_session_base>> sessions_;
};
net::awaitable<void>
run_with_reconnect(std::shared_ptr<client_type> db)
{
auto ex = co_await net::this_coro::executor;
boost::asio::steady_timer timer{ex};
for (boost::system::error_code ec;;) {
co_await db->async_run("127.0.0.1", "6379",
net::redirect_error(net::use_awaitable, ec));
timer.expires_after(std::chrono::seconds{2});
co_await timer.async_wait(net::redirect_error(net::use_awaitable, ec));
}
}
net::awaitable<void>
listener(
std::shared_ptr<net::ip::tcp::acceptor> acc,
@@ -108,10 +125,7 @@ int main()
auto db = std::make_shared<client_type>(ioc.get_executor());
auto recv = std::make_shared<receiver>(db);
db->set_receiver(recv);
// TODO: Close the listener when async_run returns.
db->async_run("127.0.0.1", "6379",
[](auto ec){ std::cout << ec.message() << std::endl;});
co_spawn(ioc, run_with_reconnect(db), net::detached);
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
auto acc = std::make_shared<net::ip::tcp::acceptor>(ioc.get_executor(), endpoint);

View File

@@ -382,6 +382,7 @@ public:
void on_write(std::size_t)
{
std::cout << "on_write" << std::endl;
if (!std::exchange(sent_, true)) {
db_->send(command::del, "key");
db_->send(command::client, "PAUSE", 5000);