mirror of
https://github.com/boostorg/redis.git
synced 2026-01-29 07:42:37 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
394bdf5b5e | ||
|
|
a4745a1a5d | ||
|
|
b99da962d1 | ||
|
|
172de6235c | ||
|
|
5e99a58685 | ||
|
|
b952a2d2d8 | ||
|
|
16d1f8df24 |
2
INSTALL
2
INSTALL
@@ -1 +1 @@
|
||||
See file:///tmp/aedis/html/installation.html
|
||||
See https://mzimbres.github.io/aedis/#using-aedis
|
||||
|
||||
153
aedis/aedis.hpp
153
aedis/aedis.hpp
@@ -32,7 +32,7 @@
|
||||
@li Support for Redis [sentinel](https://redis.io/docs/manual/sentinel).
|
||||
@li Sync and async API.
|
||||
|
||||
In addition to that, Aedis provides a high level client that offers the following functionality
|
||||
In addition to that, Aedis provides a high-level client that offers the following functionality
|
||||
|
||||
@li Management of message queues.
|
||||
@li Simplified handling of server pushes.
|
||||
@@ -54,7 +54,7 @@
|
||||
@li Get and return its old value.
|
||||
@li Quit
|
||||
|
||||
The coroutine-based async implementation of the steps above look like
|
||||
The coroutine-based asynchronous implementation of the steps above look like
|
||||
|
||||
@code
|
||||
net::awaitable<std::string> set(net::ip::tcp::endpoint ep)
|
||||
@@ -95,9 +95,9 @@
|
||||
|
||||
\subsection requests Requests
|
||||
|
||||
As stated above, request are created by defining a storage object
|
||||
As stated above, requests are created by defining a storage object
|
||||
and a serializer that knows how to convert user data into valid
|
||||
RESP3 wire-format. Redis request are composed of one or more
|
||||
RESP3 wire-format. They are composed of one or more
|
||||
commands (in Redis documentation they are called [pipelines](https://redis.io/topics/pipelining)),
|
||||
which means users can add
|
||||
as many commands to the request as they like, a feature that aids
|
||||
@@ -192,21 +192,21 @@
|
||||
---------|-------------------------------------|--------------
|
||||
lpush | Number | https://redis.io/commands/lpush
|
||||
lrange | Array | https://redis.io/commands/lrange
|
||||
set | Simple string, null or blob string | https://redis.io/commands/set
|
||||
get | Blob string | https://redis.io/commands/get
|
||||
set | Simple-string, null or blob-string | https://redis.io/commands/set
|
||||
get | Blob-string | https://redis.io/commands/get
|
||||
smembers | Set | https://redis.io/commands/smembers
|
||||
hgetall | Map | https://redis.io/commands/hgetall
|
||||
|
||||
Once the RESP3 type of a given response is known we can choose a
|
||||
proper C++ data structure to receive it in. Fortunately, this is a
|
||||
simple task for most types. The table below summarise the options
|
||||
simple task for most types. The table below summarises the options
|
||||
|
||||
RESP3 type | C++ | Type
|
||||
---------------|--------------------------------------------------------------|------------------
|
||||
Simple string | \c std::string | Simple
|
||||
Simple error | \c std::string | Simple
|
||||
Blob string | \c std::string, \c std::vector | Simple
|
||||
Blob error | \c std::string, \c std::vector | Simple
|
||||
Simple-string | \c std::string | Simple
|
||||
Simple-error | \c std::string | Simple
|
||||
Blob-string | \c std::string, \c std::vector | Simple
|
||||
Blob-error | \c std::string, \c std::vector | Simple
|
||||
Number | `long long`, `int`, `std::size_t`, \c std::string | Simple
|
||||
Double | `double`, \c std::string | Simple
|
||||
Null | `boost::optional<T>` | Simple
|
||||
@@ -216,7 +216,7 @@
|
||||
Push | \c std::vector, \c std::map, \c std::unordered_map | Aggregate
|
||||
|
||||
Responses that contain nested aggregates or heterogeneous data
|
||||
types will be given special treatment laster. As of this writing,
|
||||
types will be given special treatment later. As of this writing,
|
||||
not all RESP3 types are used by the Redis server, which means in
|
||||
practice users will be concerned with a reduced subset of the
|
||||
RESP3 specification. Now let us see some examples
|
||||
@@ -252,10 +252,10 @@
|
||||
co_await resp3::async_read(socket, dbuffer, adapt(vec));
|
||||
@endcode
|
||||
|
||||
In other words, it is pretty straightforward, just pass the result
|
||||
of \c adapt to the read function and make sure the response data
|
||||
type fits in the type you are calling @c adapter(...) with. All
|
||||
standard C++ containers are supported by aedis.
|
||||
In other words, it is straightforward, just pass the result of \c
|
||||
adapt to the read function and make sure the response data type is
|
||||
compatible with the data structure you are calling @c adapter(...)
|
||||
with. All standard C++ containers are supported by Aedis.
|
||||
|
||||
\subsubsection Optional
|
||||
|
||||
@@ -269,9 +269,8 @@
|
||||
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(umap));
|
||||
@endcode
|
||||
|
||||
Everything else stays pretty much the same, before accessing data,
|
||||
users will have to check or assert the optional contains a
|
||||
value.
|
||||
Everything else stays the same, before accessing data, users will
|
||||
have to check or assert the optional contains a value.
|
||||
|
||||
\subsubsection heterogeneous_aggregates Heterogeneous aggregates
|
||||
|
||||
@@ -279,7 +278,7 @@
|
||||
contain heterogeneous data, for example, an array that contains
|
||||
integers, strings nested sets etc. Aedis supports reading such
|
||||
aggregates in a \c std::tuple efficiently as long as the they
|
||||
don't contain 2-order nested aggregates e.g. an array that
|
||||
don't contain 3-order nested aggregates e.g. an array that
|
||||
contains an array of arrays. For example, to read the response to
|
||||
a \c hello command we can use the following response type.
|
||||
|
||||
@@ -321,11 +320,10 @@
|
||||
co_await resp3::async_read(socket, dynamic_buffer(buffer), adapt(trans));
|
||||
@endcode
|
||||
|
||||
Note that we are not ignoring the response to the commands
|
||||
themselves above but whether they have been successfully queued.
|
||||
Only after @c exec is received Redis will execute them in
|
||||
sequence. The response will then be sent in a single chunk to the
|
||||
client.
|
||||
Note that above we are not ignoring the response to the commands
|
||||
themselves but whether they have been successfully queued. Only
|
||||
after @c exec is received Redis will execute them in sequence and
|
||||
send all responses together in an array.
|
||||
|
||||
\subsubsection Serialization
|
||||
|
||||
@@ -334,7 +332,7 @@
|
||||
strings, for example
|
||||
|
||||
@code
|
||||
sr.push(command::set, "key", "{"Server": "Redis"}"); // Unquoted string
|
||||
sr.push(command::set, "key", "{"Server": "Redis"}"); // Unquoted for readability.
|
||||
sr.push(command::get, "key")
|
||||
@endcode
|
||||
|
||||
@@ -365,14 +363,13 @@
|
||||
|
||||
\subsubsection gen-case The general case
|
||||
|
||||
As already mentioned, there are cases where the response to Redis
|
||||
As already mentioned, there are cases where responses to Redis
|
||||
commands won't fit in the model presented above, some examples are
|
||||
|
||||
@li Commands (like \c set) whose response don't have a fixed
|
||||
RESP3 type. Expecting an \c int and receiving a blob string
|
||||
RESP3 type. Expecting an \c int and receiving a blob-string
|
||||
will result in error.
|
||||
@li RESP3 responses that contain three levels of (nested) aggregates can't be
|
||||
read in STL containers.
|
||||
@li RESP3 aggregates that contain nested aggregates can't be read in STL containers.
|
||||
@li Transactions with a dynamic number of commands can't be read in a \c std::tuple.
|
||||
|
||||
To deal with these cases Aedis provides the \c resp3::node
|
||||
@@ -443,30 +440,32 @@
|
||||
|
||||
As stated earlier, the low-level API is very useful for tasks that
|
||||
can be performed with short lived connections. Sometimes however,
|
||||
the need for long-lived connections becomes apparent and compeling
|
||||
the need for long-lived connections becomes compeling
|
||||
|
||||
@li \b Server \b pushes: Short lived connections can't handle server pushes (e.g. https://redis.io/topics/client-side-caching and https://redis.io/topics/notifications).
|
||||
@li \b Pubsub: Just like server pushes, to use Redis pubsub users need long lasting connections (https://redis.io/topics/pubsub).
|
||||
@li \b Performance: Keep opening and closing connections impact performance.
|
||||
@li \b Server \b pushes: Short lived connections can't deal with server pushes, that means no [client side caching](https://redis.io/topics/client-side-caching), [notifications](https://redis.io/topics/notifications) and [pubsub](https://redis.io/topics/pubsub).
|
||||
@li \b Performance: Keep opening and closing connections impact performance serverely.
|
||||
@li \b Pipeline: Code such as shown in \ref low-level-api don't support pipelines well since it can only send a fixed number of commands at time. It misses important optimization opportunities (https://redis.io/topics/pipelining).
|
||||
|
||||
A serious implementation that supports the points listed above is
|
||||
far from trivial as it involves the following async operations
|
||||
far from trivial and involves many complex asynchronous operations
|
||||
|
||||
@li \c async_resolve: Resolve a hostname.
|
||||
@li \c async_connect: Connect to Redis.
|
||||
@li \c async_read: Performed in a loop as long as the connection lives.
|
||||
@li \c async_write: Performed everytime a new message is added.
|
||||
@li \c async_wait: To timout all operations above if the server becomes unresponsive.
|
||||
|
||||
In addition to that
|
||||
Notice that many of the operations above will run concurrently with each other and, in addition to that
|
||||
|
||||
@li Each operation listed above requires timeout support.
|
||||
@li \c async_write operations require management of the message queue to prevent concurrent writes.
|
||||
@li Healthy checks must be sent periodically by the client to detect a dead or unresponsive server.
|
||||
@li Recovery after a disconnection to avoid loosing enqueued commands.
|
||||
|
||||
To avoid imposing this burden on every user, Aedis provides its
|
||||
own implementation. The general form of a program that uses the
|
||||
high-level api looks like this
|
||||
Expecting users to implement these points themselves is
|
||||
unrealistic and could result in code that performs poorly and
|
||||
can't handle errors properly. To avoid all of that, Aedis
|
||||
provides its own implementation. The general form of a program
|
||||
that uses the high-level API looks like this
|
||||
|
||||
@code
|
||||
int main()
|
||||
@@ -477,7 +476,7 @@
|
||||
auto recv = std::make_shared<receiver>(db);
|
||||
db.set_receiver(recv);
|
||||
|
||||
db.async_run("127.0.0.1", "6379", [](auto ec){ std::cout << ec.message() << std::endl;});
|
||||
db.async_run("127.0.0.1", "6379", [](auto ec){ ... });
|
||||
|
||||
ioc.run();
|
||||
}
|
||||
@@ -487,18 +486,49 @@
|
||||
receiver. For example
|
||||
|
||||
@code
|
||||
// Callbacks.
|
||||
struct receiver {
|
||||
void on_resp3(command cmd, node<boost::string_view> const& nd, boost::system::error_code& ec) { ... }
|
||||
|
||||
void on_resp3(command cmd, node<string_view> const& nd, error_code& ec) { ... }
|
||||
void on_read(command cmd, std::size_t) { ... }
|
||||
|
||||
void on_write(std::size_t n) { ... }
|
||||
|
||||
void on_push(std::size_t n) { }
|
||||
void on_write(std::size_t n) { ... }
|
||||
};
|
||||
@endcode
|
||||
|
||||
The functions in the receiver are callbacks that are called by the client class.
|
||||
The functions in the receiver above are callbacks that will be
|
||||
called when events arrives
|
||||
|
||||
@li \c on_resp3: Called when a new chunk of resp3 data is parsed.
|
||||
@li \c on_read: Called after the response to a command has been successfully read.
|
||||
@li \c on_push: Called when a server push is received.
|
||||
@li \c on_write: Called after a request has been successfully written to the stream.
|
||||
|
||||
The callbacks above are never called on errors, instead the \c
|
||||
async_run function returns. Reconnection is also supported, for
|
||||
example
|
||||
|
||||
@code
|
||||
net::awaitable<void> run(std::shared_ptr<client_type> db)
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
|
||||
boost::asio::steady_timer timer{ex};
|
||||
|
||||
for (error_code ec;;) {
|
||||
co_await db->async_run("127.0.0.1", "6379", redirect_error(use_awaitable, ec));
|
||||
|
||||
// Log the error.
|
||||
std::clog << ec.message() << std::endl;
|
||||
|
||||
// Wait two seconds and try again.
|
||||
timer.expires_after(std::chrono::seconds{2});
|
||||
co_await timer.async_wait(redirect_error(use_awaitable, ec));
|
||||
}
|
||||
}
|
||||
@endcode
|
||||
|
||||
when reconnecting the client will recover requests that haven't
|
||||
been sent to Redis yet.
|
||||
|
||||
\subsection high-level-sending-cmds Sending commands
|
||||
|
||||
@@ -518,10 +548,10 @@
|
||||
@endcode
|
||||
|
||||
The \c send functions in this case will add commands to the output
|
||||
queue and send them only if there is no pending response of a
|
||||
previously sent command. This is so because RESP3 is a
|
||||
request/response protocol, which means clients must wait for the
|
||||
response to a command before proceeding with the next one.
|
||||
queue and send them only if there is no pending response. This is
|
||||
so because RESP3 is a request/response protocol, which means
|
||||
clients must wait for responses before sending
|
||||
the next request.
|
||||
|
||||
\section examples Examples
|
||||
|
||||
@@ -538,16 +568,16 @@
|
||||
@li transaction.cpp: Shows how to read the response to transactions.
|
||||
@li custom_adapter.cpp: Shows how to write a response adapter that prints to the screen, see \ref low-level-adapters.
|
||||
|
||||
\b High \b level \b API
|
||||
\b High \b level \b API (async only)
|
||||
|
||||
@li intro_high_level.cpp: High level API usage example.
|
||||
@li intro_high_level.cpp: High-level API usage example.
|
||||
@li aggregates_high_level.cpp: Shows how receive RESP3 aggregate data types in a general way or in STL containers.
|
||||
@li subscriber_high_level.cpp: Shows how channel [subscription](https://redis.io/topics/pubsub) works at a high level.
|
||||
@li subscriber_high_level.cpp: Shows how channel [subscription](https://redis.io/topics/pubsub) works at a high-level.
|
||||
|
||||
\b Asynchronous \b Servers (high level API)
|
||||
\b Asynchronous \b Servers (high-level API)
|
||||
|
||||
@li echo_server.cpp: Shows the basic principles behind asynchronous communication with a database in an asynchronous server.
|
||||
@li chat_room.cpp: Shows how to build a scalable chat room that scales to millions of users.
|
||||
@li chat_room.cpp: Shows how to build a scalable chat room.
|
||||
|
||||
\section using-aedis Using Aedis
|
||||
|
||||
@@ -581,7 +611,7 @@
|
||||
```
|
||||
|
||||
If you can't use \c configure and \c make (e.g. Windows users)
|
||||
you can already add the directory where you unpacked aedis to the
|
||||
you can already add the directory where you unpacked Aedis to the
|
||||
include directories in your project, otherwise run
|
||||
|
||||
```
|
||||
@@ -650,7 +680,7 @@
|
||||
Redis features like client side caching, among other things.
|
||||
@li The Asio asynchronous model.
|
||||
@li Serialization of user data types that avoids temporaries.
|
||||
@li Error handling with error-code and exception overload.
|
||||
@li Error handling with error-code and exception overloads.
|
||||
@li Healthy checks.
|
||||
@li Fine control over memory allocation by means of allocators.
|
||||
|
||||
@@ -798,9 +828,12 @@
|
||||
|
||||
\section Acknowledgement
|
||||
|
||||
I would like to thank Vinícius dos Santos Oliveira for useful discussion about how Aedis consumes buffers in the read operation (among other things).
|
||||
Some people that were helpful in the development of Aedis
|
||||
|
||||
\section Referece
|
||||
@li Richard Hodges ([madmongo1](https://github.com/madmongo1)): For answering pretty much every question I had about Asio and the design of asynchronous programs.
|
||||
@li Vinícius dos Santos Oliveira ([vinipsmaker](https://github.com/vinipsmaker)): For useful discussion about how Aedis consumes buffers in the read operation (among other things).
|
||||
|
||||
\section Reference
|
||||
|
||||
See \subpage any.
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/container/static_vector.hpp>
|
||||
|
||||
#include <aedis/resp3/type.hpp>
|
||||
#include <aedis/resp3/node.hpp>
|
||||
@@ -30,53 +29,49 @@ namespace generic {
|
||||
/** \brief A high level Redis client.
|
||||
* \ingroup any
|
||||
*
|
||||
* This class represents a connection to the Redis server. Some of
|
||||
* its most important features are
|
||||
*
|
||||
* 1. Automatic management of commands. The implementation will send
|
||||
* commands and read responses automatically for the user.
|
||||
* 2. Memory reuse. Dynamic memory allocations will decrease with time.
|
||||
*
|
||||
* For more details, please see the documentation of each individual
|
||||
* function.
|
||||
* This class keeps a connection open to the Redis server where
|
||||
* commands can be sent at any time. For more details, please see the
|
||||
* documentation of each individual function.
|
||||
*/
|
||||
template <class AsyncReadWriteStream, class Command>
|
||||
class client {
|
||||
public:
|
||||
/// Executor used.
|
||||
/// Executor type.
|
||||
using executor_type = typename AsyncReadWriteStream::executor_type;
|
||||
|
||||
/// Type of the callback called when a message is received.
|
||||
/// Callback type of read operations.
|
||||
using read_handler_type = std::function<void(Command cmd, std::size_t)>;
|
||||
|
||||
/// Type of the callback called when a message is written.
|
||||
/// Callback type of write operations.
|
||||
using write_handler_type = std::function<void(std::size_t)>;
|
||||
|
||||
/// Type of the callback called when a push message is received.
|
||||
/// Callback type of push operations.
|
||||
using push_handler_type = std::function<void(std::size_t)>;
|
||||
|
||||
/// Type of the callback called when resp3 chuncks are received.
|
||||
/// Callback type of resp3 operations.
|
||||
using resp3_handler_type = std::function<void(Command, resp3::node<boost::string_view> const&, boost::system::error_code&)>;
|
||||
|
||||
using default_completion_token_type = boost::asio::default_completion_token_t<executor_type>;
|
||||
|
||||
/** @brief Configuration parameters.
|
||||
*/
|
||||
struct config {
|
||||
/// Timeout of the async_resolve operation.
|
||||
/// Timeout of the \c async_resolve operation.
|
||||
std::chrono::seconds resolve_timeout = std::chrono::seconds{5};
|
||||
|
||||
/// Timeout of the async_connect operation.
|
||||
/// Timeout of the \c async_connect operation.
|
||||
std::chrono::seconds connect_timeout = std::chrono::seconds{5};
|
||||
|
||||
/// Timeout of the async_read operation.
|
||||
/// Timeout of the \c async_read operation.
|
||||
std::chrono::seconds read_timeout = std::chrono::seconds{5};
|
||||
|
||||
/// Timeout of the async_write operation.
|
||||
/// Timeout of the \c async_write operation.
|
||||
std::chrono::seconds write_timeout = std::chrono::seconds{5};
|
||||
|
||||
/// Time after which a connection is considered idle if no data is received.
|
||||
std::chrono::seconds idle_timeout = std::chrono::seconds{10};
|
||||
|
||||
/// The maximum size of a read.
|
||||
/// The maximum size allwed in a read operation.
|
||||
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();
|
||||
};
|
||||
|
||||
@@ -96,6 +91,7 @@ public:
|
||||
, on_write_{[](std::size_t){}}
|
||||
, on_push_{[](std::size_t){}}
|
||||
, on_resp3_{[](Command, resp3::node<boost::string_view> const&, boost::system::error_code&) {}}
|
||||
, sr_{requests_}
|
||||
, last_data_{std::chrono::time_point<std::chrono::steady_clock>::min()}
|
||||
, type_{resp3::type::invalid}
|
||||
, cmd_info_{std::make_pair<Command>(Command::invalid, 0)}
|
||||
@@ -110,7 +106,7 @@ public:
|
||||
/** @brief Adds a command to the output command queue.
|
||||
*
|
||||
* Adds a command to the end of the next request and signals the
|
||||
* writer operation there is new message awaiting to be sent.
|
||||
* writer operation there is a new message awaiting to be sent.
|
||||
* Otherwise the function is equivalent to serializer::push. @sa
|
||||
* serializer.
|
||||
*/
|
||||
@@ -119,12 +115,10 @@ public:
|
||||
{
|
||||
auto const can_write = prepare_next();
|
||||
|
||||
serializer<std::string> sr(requests_);
|
||||
auto const before = requests_.size();
|
||||
sr.push(cmd, args...);
|
||||
auto const after = requests_.size();
|
||||
BOOST_ASSERT(after - before != 0);
|
||||
auto const d = after - before;
|
||||
sr_.push(cmd, args...);
|
||||
auto const d = requests_.size() - before;
|
||||
BOOST_ASSERT(d != 0);
|
||||
info_.back().size += d;;
|
||||
|
||||
if (!has_push_response(cmd)) {
|
||||
@@ -139,7 +133,7 @@ public:
|
||||
/** @brief Adds a command to the output command queue.
|
||||
*
|
||||
* Adds a command to the end of the next request and signals the
|
||||
* writer operation there is new message awaiting to be sent.
|
||||
* writer operation there is a new message awaiting to be sent.
|
||||
* Otherwise the function is equivalent to
|
||||
* serializer::push_range2.
|
||||
* @sa serializer.
|
||||
@@ -152,12 +146,10 @@ public:
|
||||
|
||||
auto const can_write = prepare_next();
|
||||
|
||||
serializer<std::string> sr(requests_);
|
||||
auto const before = requests_.size();
|
||||
sr.push_range2(cmd, key, begin, end);
|
||||
auto const after = requests_.size();
|
||||
BOOST_ASSERT(after - before != 0);
|
||||
auto const d = after - before;
|
||||
sr_.push_range2(cmd, key, begin, end);
|
||||
auto const d = requests_.size() - before;
|
||||
BOOST_ASSERT(d != 0);
|
||||
info_.back().size += d;
|
||||
|
||||
if (!has_push_response(cmd)) {
|
||||
@@ -172,7 +164,7 @@ public:
|
||||
/** @brief Adds a command to the output command queue.
|
||||
*
|
||||
* Adds a command to the end of the next request and signals the
|
||||
* writer operation there is new message awaiting to be sent.
|
||||
* writer operation there is a new message awaiting to be sent.
|
||||
* Otherwise the function is equivalent to
|
||||
* serializer::push_range2.
|
||||
* @sa serializer.
|
||||
@@ -185,12 +177,10 @@ public:
|
||||
|
||||
auto const can_write = prepare_next();
|
||||
|
||||
serializer<std::string> sr(requests_);
|
||||
auto const before = requests_.size();
|
||||
sr.push_range2(cmd, begin, end);
|
||||
auto const after = requests_.size();
|
||||
BOOST_ASSERT(after - before != 0);
|
||||
auto const d = after - before;
|
||||
sr_.push_range2(cmd, begin, end);
|
||||
auto const d = requests_.size() - before;
|
||||
BOOST_ASSERT(d != 0);
|
||||
info_.back().size += d;
|
||||
|
||||
if (!has_push_response(cmd)) {
|
||||
@@ -205,7 +195,7 @@ public:
|
||||
/** @brief Adds a command to the output command queue.
|
||||
*
|
||||
* Adds a command to the end of the next request and signals the
|
||||
* writer operation there is new message awaiting to be sent.
|
||||
* writer operation there is a new message awaiting to be sent.
|
||||
* Otherwise the function is equivalent to
|
||||
* serializer::push_range.
|
||||
* @sa serializer.
|
||||
@@ -221,7 +211,7 @@ public:
|
||||
/** @brief Adds a command to the output command queue.
|
||||
*
|
||||
* Adds a command to the end of the next request and signals the
|
||||
* writer operation there is new message awaiting to be sent.
|
||||
* writer operation there is a new message awaiting to be sent.
|
||||
* Otherwise the function is equivalent to
|
||||
* serializer::push_range.
|
||||
* @sa serializer.
|
||||
@@ -236,35 +226,58 @@ public:
|
||||
|
||||
/** @brief Starts communication with the Redis server asynchronously.
|
||||
*
|
||||
* This class performs the following steps
|
||||
* This function performs the following steps
|
||||
*
|
||||
* @li Resolve the Redis host as of \c async_resolve with the
|
||||
* @li Resolves the Redis host as of \c async_resolve with the
|
||||
* timeout passed in client::config::resolve_timeout.
|
||||
*
|
||||
* @li Connect to one of the endpoints returned by the resolve
|
||||
* @li Connects to one of the endpoints returned by the resolve
|
||||
* operation with the timeout passed in client::config::connect_timeout.
|
||||
*
|
||||
* @li Start the async read operation that keeps reading
|
||||
* incoming responses. Each individual read uses the timeout
|
||||
* passed on client::config::read_timeout. After each successful read
|
||||
* it will call the read or push callback.
|
||||
* @li Starts the \c async_read operation that keeps reading incoming
|
||||
* responses. Each individual read uses the timeout passed on
|
||||
* client::config::read_timeout. After each successful read it
|
||||
* will call the read or push callback.
|
||||
*
|
||||
* @li Start the async write operation that wait for new commands
|
||||
* @li Starts the \c async_write operation that waits for new commands
|
||||
* to be sent to Redis. Each individual write uses the timeout
|
||||
* passed on client::config::write_timeout. After a successful write it
|
||||
* will call the write callback.
|
||||
* passed on client::config::write_timeout. After a successful
|
||||
* write it will call the write callback.
|
||||
*
|
||||
* @li Start the check idle operation with the timeout specified
|
||||
* in client::config::idle_timeout. If no data is received during that
|
||||
* time interval async_run returns generic::error::idle_timeout.
|
||||
* @li Starts the check idle operation with the timeout specified
|
||||
* in client::config::idle_timeout. If no data is received during
|
||||
* that time interval \c async_run completes with
|
||||
* generic::error::idle_timeout.
|
||||
*
|
||||
* @li Start the healthy check operation that sends
|
||||
* @li Starts the healthy check operation that sends
|
||||
* redis::command::ping to Redis with a frequency equal to
|
||||
* client::config::idle_timeout / 2.
|
||||
*
|
||||
* In addition to the callbacks mentioned above, the read
|
||||
* operations will call the resp3 callback as soon a new chunks
|
||||
* of data become available to the user.
|
||||
* operations will call the resp3 callback as soon a new chunks of
|
||||
* data become available to the user.
|
||||
*
|
||||
* It is safe to call \c async_run after it has returned. In this
|
||||
* case, any outstanding commands will be sent after the
|
||||
* connection is restablished. If a disconnect occurs while the
|
||||
* response to a request has not been received, the client doesn't
|
||||
* try to resend it to avoid resubmission.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* @code
|
||||
* awaitable<void> run_with_reconnect(std::shared_ptr<client_type> db)
|
||||
* {
|
||||
* auto ex = co_await this_coro::executor;
|
||||
* asio::steady_timer timer{ex};
|
||||
*
|
||||
* for (error_code ec;;) {
|
||||
* co_await db->async_run("127.0.0.1", "6379", redirect_error(use_awaitable, ec));
|
||||
* timer.expires_after(std::chrono::seconds{2});
|
||||
* co_await timer.async_wait(redirect_error(use_awaitable, ec));
|
||||
* }
|
||||
* }
|
||||
* @endcode
|
||||
*
|
||||
* \param host Ip address or name of the Redis server.
|
||||
* \param port Port where the Redis server is listening.
|
||||
@@ -276,17 +289,7 @@ public:
|
||||
* void f(boost::system::error_code);
|
||||
* @endcode
|
||||
*
|
||||
* Notice this function returns only when there is an error.
|
||||
*
|
||||
* @remark
|
||||
*
|
||||
* @li It is safe to call this function again after it has
|
||||
* returned. In that case, any outstanding commands will be sent
|
||||
* and not get lost.
|
||||
*
|
||||
* @li If a disconnect occurs while the response to a request has
|
||||
* not been received, the client doesn't try to resend it to avoid
|
||||
* resubmission problem.
|
||||
* \return This function returns only when there is an error.
|
||||
*/
|
||||
template <class CompletionToken = default_completion_token_type>
|
||||
auto
|
||||
@@ -347,8 +350,9 @@ private:
|
||||
using time_point_type = std::chrono::time_point<std::chrono::steady_clock>;
|
||||
|
||||
template <class T, class V> friend struct detail::reader_op;
|
||||
template <class T, class V> friend struct detail::ping_op;
|
||||
template <class T, class V> friend struct detail::ping_after_op;
|
||||
template <class T> friend struct detail::read_op;
|
||||
template <class T> friend struct detail::read_until_op;
|
||||
template <class T> friend struct detail::writer_op;
|
||||
template <class T> friend struct detail::write_op;
|
||||
template <class T> friend struct detail::run_op;
|
||||
@@ -357,11 +361,11 @@ private:
|
||||
template <class T> friend struct detail::check_idle_op;
|
||||
template <class T> friend struct detail::init_op;
|
||||
template <class T> friend struct detail::read_write_check_op;
|
||||
template <class T> friend struct detail::wait_data_op;
|
||||
template <class T> friend struct detail::wait_for_data_op;
|
||||
|
||||
void on_resolve()
|
||||
{
|
||||
// If we are comming from a connection that was lost we have to
|
||||
// If we are coming from a connection that was lost we have to
|
||||
// reset the socket to a fresh state.
|
||||
socket_ =
|
||||
std::make_shared<AsyncReadWriteStream>(read_timer_.get_executor());
|
||||
@@ -377,6 +381,7 @@ private:
|
||||
// no commands that were left unresponded from the last
|
||||
// connection. We can send hello as usual.
|
||||
BOOST_ASSERT(requests_.empty());
|
||||
BOOST_ASSERT(commands_.empty());
|
||||
send(Command::hello, 3);
|
||||
return;
|
||||
}
|
||||
@@ -393,6 +398,7 @@ private:
|
||||
// the risc of resubmission).
|
||||
requests_.erase(0, info_.front().size);
|
||||
|
||||
// Erases the commands that were lost as well.
|
||||
commands_.erase(
|
||||
std::begin(commands_),
|
||||
std::begin(commands_) + info_.front().cmds);
|
||||
@@ -403,17 +409,25 @@ private:
|
||||
// info_.erase(std::begin(info_));
|
||||
}
|
||||
|
||||
std::string tmp;
|
||||
serializer<std::string> sr(tmp);
|
||||
sr.push(Command::hello, 3);
|
||||
auto const hello_size = tmp.size();
|
||||
std::copy(std::cbegin(requests_), std::cend(requests_), std::back_inserter(tmp));
|
||||
requests_ = std::move(tmp);
|
||||
// Code below will add a hello to the front of the request and
|
||||
// update info_ and commands_ accordingly.
|
||||
|
||||
info_.front().size = hello_size + info_.front().size;
|
||||
++info_.front().cmds;
|
||||
auto const old_size = requests_.size();
|
||||
sr_.push(Command::hello, 3);
|
||||
auto const hello_size = requests_.size() - old_size;;
|
||||
|
||||
// Push front.
|
||||
// Now we have to rotate the hello to the front of the request
|
||||
// (Remember it must always be the first command).
|
||||
std::rotate(
|
||||
std::begin(requests_),
|
||||
std::begin(requests_) + old_size,
|
||||
std::end(requests_));
|
||||
|
||||
// Updates info_.
|
||||
info_.front().size += hello_size;
|
||||
info_.front().cmds += 1;
|
||||
|
||||
// Updates commands_
|
||||
commands_.push_back(std::make_pair(Command::hello, hello_size));
|
||||
std::rotate(
|
||||
std::begin(commands_),
|
||||
@@ -421,11 +435,9 @@ private:
|
||||
std::end(commands_));
|
||||
}
|
||||
|
||||
/* Prepares the back of the queue to receive further commands.
|
||||
*
|
||||
* If true is returned the request in the front of the queue can be
|
||||
* sent to the server. See async_write_some.
|
||||
*/
|
||||
// Prepares the back of the queue to receive further commands. If
|
||||
// true is returned the request in the front of the queue can be
|
||||
// sent to the server.
|
||||
bool prepare_next()
|
||||
{
|
||||
if (info_.empty()) {
|
||||
@@ -448,7 +460,7 @@ private:
|
||||
return info_.front().cmds == 0;
|
||||
}
|
||||
|
||||
// Returns true when the next request can be writen.
|
||||
// Returns true when the next request can be written.
|
||||
bool on_cmd(command_info_type)
|
||||
{
|
||||
BOOST_ASSERT(!info_.empty());
|
||||
@@ -477,7 +489,7 @@ private:
|
||||
}
|
||||
|
||||
// Connects the socket to one of the endpoints in endpoints_ and
|
||||
// stores the successful enpoint in endpoint_.
|
||||
// stores the successful endpoint in endpoint_.
|
||||
template <class CompletionToken = default_completion_token_type>
|
||||
auto
|
||||
async_connect(CompletionToken&& token = default_completion_token_type{})
|
||||
@@ -488,6 +500,16 @@ private:
|
||||
>(detail::connect_op<client>{this}, token, write_timer_.get_executor());
|
||||
}
|
||||
|
||||
template <class CompletionToken = default_completion_token_type>
|
||||
auto
|
||||
async_read_until(CompletionToken&& token = default_completion_token_type{})
|
||||
{
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::read_until_op<client>{this}, token, read_timer_.get_executor());
|
||||
}
|
||||
|
||||
// Reads a complete resp3 response from the socket using the
|
||||
// timeout config::read_timeout. On a successful read calls
|
||||
// on_read_ or on_push_ depending on whether the response is a push
|
||||
@@ -562,7 +584,7 @@ private:
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::ping_op<client, Command>{this}, token, read_timer_);
|
||||
>(detail::ping_after_op<client, Command>{this}, token, read_timer_);
|
||||
}
|
||||
|
||||
template <class CompletionToken = default_completion_token_type>
|
||||
@@ -572,7 +594,7 @@ private:
|
||||
return boost::asio::async_compose
|
||||
< CompletionToken
|
||||
, void(boost::system::error_code)
|
||||
>(detail::wait_data_op<client>{this}, token, read_timer_);
|
||||
>(detail::wait_for_data_op<client>{this}, token, read_timer_);
|
||||
}
|
||||
|
||||
template <class CompletionToken = default_completion_token_type>
|
||||
@@ -636,21 +658,22 @@ private:
|
||||
// Called when a complete push message is received.
|
||||
push_handler_type on_push_;
|
||||
|
||||
// Called by the parser after each new chunck of resp3 data is
|
||||
// Called by the parser after each new chunk of resp3 data is
|
||||
// processed.
|
||||
resp3_handler_type on_resp3_;
|
||||
|
||||
// Buffer used by the read operations.
|
||||
std::string read_buffer_;
|
||||
|
||||
// Requests payload.
|
||||
// Requests payload and its serializer.
|
||||
std::string requests_;
|
||||
serializer<std::string> sr_;
|
||||
|
||||
// The commands contained in the requests.
|
||||
std::vector<command_info_type> commands_;
|
||||
|
||||
// Info about the requests.
|
||||
boost::container::static_vector<info, 2> info_;
|
||||
std::vector<info> info_;
|
||||
|
||||
// Last time we received data.
|
||||
time_point_type last_data_;
|
||||
@@ -667,10 +690,8 @@ private:
|
||||
// See async_resolve.
|
||||
boost::asio::ip::tcp::resolver::results_type endpoints_;
|
||||
|
||||
// Host passed to async_run.
|
||||
// Host and port passed to async_run.
|
||||
boost::string_view host_;
|
||||
|
||||
// Port passed to async_run.
|
||||
boost::string_view port_;
|
||||
};
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ namespace detail {
|
||||
#include <boost/asio/yield.hpp>
|
||||
|
||||
template <class Client, class Command>
|
||||
struct ping_op {
|
||||
struct ping_after_op {
|
||||
Client* cli;
|
||||
boost::asio::coroutine coro;
|
||||
|
||||
@@ -36,13 +36,13 @@ struct ping_op {
|
||||
void
|
||||
operator()(Self& self, boost::system::error_code ec = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
reenter (coro)
|
||||
{
|
||||
BOOST_ASSERT((cli->cfg_.idle_timeout / 2) != std::chrono::seconds{0});
|
||||
cli->read_timer_.expires_after(cli->cfg_.idle_timeout / 2);
|
||||
yield cli->read_timer_.async_wait(std::move(self));
|
||||
if (ec) {
|
||||
// operation_aborted: ok, not an error.
|
||||
self.complete({});
|
||||
self.complete(ec);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ struct ping_op {
|
||||
};
|
||||
|
||||
template <class Client>
|
||||
struct wait_data_op {
|
||||
struct read_until_op {
|
||||
Client* cli;
|
||||
boost::asio::coroutine coro;
|
||||
|
||||
@@ -63,21 +63,46 @@ struct wait_data_op {
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
reenter (coro) {
|
||||
// Detached.
|
||||
cli->async_ping_after([](boost::system::error_code ec){});
|
||||
|
||||
reenter (coro)
|
||||
{
|
||||
// Waits for incomming data.
|
||||
yield boost::asio::async_read_until(*cli->socket_, boost::asio::dynamic_buffer(cli->read_buffer_, cli->cfg_.max_read_size), "\r\n", std::move(self));
|
||||
yield
|
||||
boost::asio::async_read_until(
|
||||
*cli->socket_,
|
||||
boost::asio::dynamic_buffer(cli->read_buffer_, cli->cfg_.max_read_size),
|
||||
"\r\n",
|
||||
std::move(self));
|
||||
|
||||
// Cancels the async_ping_after.
|
||||
cli->read_timer_.cancel();
|
||||
if (ec) {
|
||||
self.complete(ec);
|
||||
return;
|
||||
}
|
||||
self.complete(ec);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.complete({});
|
||||
template <class Client>
|
||||
struct wait_for_data_op {
|
||||
Client* cli;
|
||||
boost::asio::coroutine coro;
|
||||
|
||||
template <class Self>
|
||||
void operator()( Self& self
|
||||
, std::array<std::size_t, 2> order = {}
|
||||
, boost::system::error_code ec1 = {}
|
||||
, boost::system::error_code ec2 = {})
|
||||
{
|
||||
reenter (coro)
|
||||
{
|
||||
yield
|
||||
boost::asio::experimental::make_parallel_group(
|
||||
[this](auto token) { return cli->async_read_until(token);},
|
||||
[this](auto token) { return cli->async_ping_after(token);}
|
||||
).async_wait(
|
||||
boost::asio::experimental::wait_for_all(),
|
||||
std::move(self));
|
||||
|
||||
// The order of completion is not important.
|
||||
self.complete(ec1);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -90,8 +115,8 @@ struct check_idle_op {
|
||||
template <class Self>
|
||||
void operator()(Self& self, boost::system::error_code ec = {})
|
||||
{
|
||||
reenter (coro) for(;;) {
|
||||
|
||||
reenter (coro) for(;;)
|
||||
{
|
||||
cli->check_idle_timer_.expires_after(cli->cfg_.idle_timeout);
|
||||
yield cli->check_idle_timer_.async_wait(std::move(self));
|
||||
if (ec) {
|
||||
@@ -122,7 +147,8 @@ struct resolve_op {
|
||||
, boost::system::error_code ec = {}
|
||||
, boost::asio::ip::tcp::resolver::results_type res = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
reenter (coro)
|
||||
{
|
||||
yield
|
||||
cli->resv_.async_resolve(cli->host_.data(), cli->port_.data(), std::move(self));
|
||||
if (ec) {
|
||||
@@ -147,7 +173,8 @@ struct connect_op {
|
||||
, boost::system::error_code ec = {}
|
||||
, boost::asio::ip::tcp::endpoint const& ep = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
reenter (coro)
|
||||
{
|
||||
yield
|
||||
boost::asio::async_connect(
|
||||
*cli->socket_,
|
||||
@@ -176,8 +203,8 @@ struct init_op {
|
||||
, boost::system::error_code ec1 = {}
|
||||
, boost::system::error_code ec2 = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
|
||||
reenter (coro)
|
||||
{
|
||||
// Tries to resolve with a timeout. We can use the writer
|
||||
// timer here as there is no ongoing write operation.
|
||||
cli->write_timer_.expires_after(cli->cfg_.resolve_timeout);
|
||||
@@ -263,8 +290,8 @@ struct read_write_check_op {
|
||||
, boost::system::error_code ec2 = {}
|
||||
, boost::system::error_code ec3 = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
|
||||
reenter (coro)
|
||||
{
|
||||
// Starts the reader and writer ops.
|
||||
cli->wait_write_timer_.expires_at(std::chrono::steady_clock::time_point::max());
|
||||
|
||||
@@ -307,8 +334,8 @@ struct run_op {
|
||||
template <class Self>
|
||||
void operator()(Self& self, boost::system::error_code ec = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
|
||||
reenter (coro)
|
||||
{
|
||||
yield cli->async_init(std::move(self));
|
||||
if (ec) {
|
||||
self.complete(ec);
|
||||
@@ -338,8 +365,8 @@ struct write_op {
|
||||
, std::size_t n = 0
|
||||
, boost::system::error_code ec2 = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
|
||||
reenter (coro)
|
||||
{
|
||||
BOOST_ASSERT(!cli->info_.empty());
|
||||
BOOST_ASSERT(cli->info_.front().size != 0);
|
||||
BOOST_ASSERT(!cli->requests_.empty());
|
||||
@@ -398,7 +425,8 @@ struct writer_op {
|
||||
template <class Self>
|
||||
void operator()(Self& self , boost::system::error_code ec = {})
|
||||
{
|
||||
reenter (coro) for (;;) {
|
||||
reenter (coro) for (;;)
|
||||
{
|
||||
yield cli->async_write(std::move(self));
|
||||
if (ec) {
|
||||
cli->socket_->close();
|
||||
@@ -428,7 +456,8 @@ struct read_op {
|
||||
, std::size_t n = 0
|
||||
, boost::system::error_code ec2 = {})
|
||||
{
|
||||
reenter (coro) {
|
||||
reenter (coro)
|
||||
{
|
||||
cli->read_timer_.expires_after(cli->cfg_.read_timeout);
|
||||
|
||||
yield
|
||||
@@ -483,10 +512,10 @@ struct reader_op {
|
||||
, boost::system::error_code ec = {}
|
||||
, std::size_t n = 0)
|
||||
{
|
||||
reenter (coro) for (;;) {
|
||||
|
||||
boost::ignore_unused(n);
|
||||
boost::ignore_unused(n);
|
||||
|
||||
reenter (coro) for (;;)
|
||||
{
|
||||
if (cli->read_buffer_.empty()) {
|
||||
yield cli->async_wait_for_data(std::move(self));
|
||||
if (ec) {
|
||||
@@ -498,13 +527,14 @@ struct reader_op {
|
||||
|
||||
BOOST_ASSERT(!cli->read_buffer_.empty());
|
||||
cli->type_ = resp3::to_type(cli->read_buffer_.front());
|
||||
cli->cmd_info_ = std::make_pair<>(Command::invalid, 0);
|
||||
cli->cmd_info_ = std::make_pair(Command::invalid, 0);
|
||||
if (cli->type_ != resp3::type::push) {
|
||||
BOOST_ASSERT(!cli->commands_.empty());
|
||||
cli->cmd_info_ = cli->commands_.front();
|
||||
}
|
||||
|
||||
cli->last_data_ = std::chrono::steady_clock::now();
|
||||
|
||||
yield cli->async_read(std::move(self));
|
||||
if (ec) {
|
||||
cli->on_reader_exit();
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
namespace aedis {
|
||||
namespace generic {
|
||||
|
||||
/** \brief Errors from the generic module.
|
||||
/** \brief Generic errors.
|
||||
* \ingroup any
|
||||
*/
|
||||
enum class error
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
namespace aedis {
|
||||
namespace generic {
|
||||
|
||||
/** @brief Creates a Redis request from user data.
|
||||
/** @brief Creates Redis requests from user data.
|
||||
* \ingroup any
|
||||
*
|
||||
* A request is composed of one or more redis commands and is
|
||||
|
||||
@@ -448,9 +448,7 @@ char const* to_string(command c);
|
||||
*/
|
||||
std::ostream& operator<<(std::ostream& os, command c);
|
||||
|
||||
/** \brief Checks whether a command has push response.
|
||||
* \ingroup any
|
||||
*/
|
||||
// Checks whether a command has push response.
|
||||
bool has_push_response(command cmd);
|
||||
|
||||
} // redis
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
namespace aedis {
|
||||
namespace resp3 {
|
||||
|
||||
/** \brief RESP3 parsing errors.
|
||||
/** \brief RESP3 errors.
|
||||
* \ingroup any
|
||||
*/
|
||||
enum class error
|
||||
|
||||
@@ -21,14 +21,6 @@ namespace resp3 {
|
||||
* Redis responses are the pre-order view of the response tree (see
|
||||
* https://en.wikipedia.org/wiki/Tree_traversal#Pre-order,_NLR).
|
||||
*
|
||||
* The node class represent one element in the response tree. The
|
||||
* string type is a template to give more flexibility. The library
|
||||
* uses
|
||||
*
|
||||
* @li @c boost::string_view
|
||||
* @li @c std::string
|
||||
* @li @c boost::static_string
|
||||
*
|
||||
* \remark Any Redis response can be received in an array of nodes,
|
||||
* for example \c std::vector<node<std::string>>.
|
||||
*/
|
||||
|
||||
@@ -34,12 +34,10 @@ namespace resp3 {
|
||||
* For a complete example see examples/intro_sync.cpp. This function
|
||||
* is implemented in terms of one or more calls to @c
|
||||
* asio::read_until and @c asio::read functions, and is known as a @a
|
||||
* composed @a operation.
|
||||
*
|
||||
* Furthermore, the implementation may read additional bytes from the
|
||||
* stream that lie past the end of the message being read. These
|
||||
* additional bytes are stored in the dynamic buffer, which must be
|
||||
* preserved for subsequent reads.
|
||||
* composed @a operation. Furthermore, the implementation may read
|
||||
* additional bytes from the stream that lie past the end of the
|
||||
* message being read. These additional bytes are stored in the
|
||||
* dynamic buffer, which must be preserved for subsequent reads.
|
||||
*
|
||||
* \param stream The stream from which to read e.g. a tcp socket.
|
||||
* \param buf Dynamic buffer (version 2).
|
||||
@@ -144,12 +142,11 @@ read(
|
||||
* For a complete example see examples/transaction.cpp. This function
|
||||
* is implemented in terms of one or more calls to @c
|
||||
* asio::async_read_until and @c asio::async_read functions, and is
|
||||
* known as a @a composed @a operation.
|
||||
*
|
||||
* Furthermore, the implementation may read additional bytes from the
|
||||
* stream that lie past the end of the message being read. These
|
||||
* additional bytes are stored in the dynamic buffer, which must be
|
||||
* preserved for subsequent reads.
|
||||
* known as a @a composed @a operation. Furthermore, the
|
||||
* implementation may read additional bytes from the stream that lie
|
||||
* past the end of the message being read. These additional bytes are
|
||||
* stored in the dynamic buffer, which must be preserved for
|
||||
* subsequent reads.
|
||||
*
|
||||
* \param stream The stream from which to read e.g. a tcp socket.
|
||||
* \param buffer Dynamic buffer (version 2).
|
||||
|
||||
@@ -69,18 +69,12 @@ char const* to_string(type t);
|
||||
*/
|
||||
std::ostream& operator<<(std::ostream& os, type t);
|
||||
|
||||
/** \brief Checks whether the data type is an aggregate.
|
||||
* \ingroup any
|
||||
* \param t RESP3 data type.
|
||||
/* Checks whether the data type is an aggregate.
|
||||
*/
|
||||
bool is_aggregate(type t);
|
||||
|
||||
/** @brief Checks the data type multilicity.
|
||||
* \ingroup any
|
||||
* \param t RESP3 type.
|
||||
* \returns For map and attribute data types this function returns 2.
|
||||
* All other types have value 1.
|
||||
*/
|
||||
// For map and attribute data types this function returns 2. All
|
||||
// other types have value 1.
|
||||
std::size_t element_multiplicity(type t);
|
||||
|
||||
// Returns the wire code of a given type.
|
||||
|
||||
@@ -69,9 +69,7 @@ char const* to_string(command c);
|
||||
*/
|
||||
std::ostream& operator<<(std::ostream& os, command c);
|
||||
|
||||
/** \brief Checks whether a command has push response.
|
||||
* \ingroup any
|
||||
*/
|
||||
// Checks whether a command has push response.
|
||||
bool has_push_response(command cmd);
|
||||
|
||||
} // sentinel
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
AC_PREREQ([2.69])
|
||||
AC_INIT([Aedis], [0.1.1], [mzimbres@gmail.com])
|
||||
AC_INIT([Aedis], [0.1.2], [mzimbres@gmail.com])
|
||||
AC_CONFIG_MACRO_DIR([m4])
|
||||
#AC_CONFIG_SRCDIR([src/aedis.cpp])
|
||||
AC_CONFIG_HEADERS([config.h])
|
||||
|
||||
@@ -24,7 +24,6 @@ using aedis::redis::command;
|
||||
using aedis::generic::client;
|
||||
using aedis::user_session;
|
||||
using aedis::user_session_base;
|
||||
|
||||
using client_type = client<net::ip::tcp::socket, command>;
|
||||
using response_type = std::vector<node<std::string>>;
|
||||
|
||||
@@ -40,8 +39,10 @@ public:
|
||||
adapter_(nd, ec);
|
||||
}
|
||||
|
||||
void on_read(command cmd, std::size_t)
|
||||
void on_read(command cmd, std::size_t n)
|
||||
{
|
||||
std::cout << "on_read: " << cmd << " " << n << std::endl;
|
||||
|
||||
switch (cmd) {
|
||||
case command::ping:
|
||||
if (resp_.front().value != "PONG") {
|
||||
@@ -77,6 +78,22 @@ private:
|
||||
std::queue<std::shared_ptr<user_session_base>> sessions_;
|
||||
};
|
||||
|
||||
net::awaitable<void>
|
||||
run_with_reconnect(std::shared_ptr<client_type> db)
|
||||
{
|
||||
auto ex = co_await net::this_coro::executor;
|
||||
|
||||
boost::asio::steady_timer timer{ex};
|
||||
|
||||
for (boost::system::error_code ec;;) {
|
||||
co_await db->async_run("127.0.0.1", "6379",
|
||||
net::redirect_error(net::use_awaitable, ec));
|
||||
|
||||
timer.expires_after(std::chrono::seconds{2});
|
||||
co_await timer.async_wait(net::redirect_error(net::use_awaitable, ec));
|
||||
}
|
||||
}
|
||||
|
||||
net::awaitable<void>
|
||||
listener(
|
||||
std::shared_ptr<net::ip::tcp::acceptor> acc,
|
||||
@@ -108,10 +125,7 @@ int main()
|
||||
auto db = std::make_shared<client_type>(ioc.get_executor());
|
||||
auto recv = std::make_shared<receiver>(db);
|
||||
db->set_receiver(recv);
|
||||
|
||||
// TODO: Close the listener when async_run returns.
|
||||
db->async_run("127.0.0.1", "6379",
|
||||
[](auto ec){ std::cout << ec.message() << std::endl;});
|
||||
co_spawn(ioc, run_with_reconnect(db), net::detached);
|
||||
|
||||
auto endpoint = net::ip::tcp::endpoint{net::ip::tcp::v4(), 55555};
|
||||
auto acc = std::make_shared<net::ip::tcp::acceptor>(ioc.get_executor(), endpoint);
|
||||
|
||||
@@ -382,6 +382,7 @@ public:
|
||||
|
||||
void on_write(std::size_t)
|
||||
{
|
||||
std::cout << "on_write" << std::endl;
|
||||
if (!std::exchange(sent_, true)) {
|
||||
db_->send(command::del, "key");
|
||||
db_->send(command::client, "PAUSE", 5000);
|
||||
|
||||
Reference in New Issue
Block a user