mirror of
https://github.com/boostorg/cobalt.git
synced 2026-01-19 16:12:15 +00:00
142 lines
4.1 KiB
Plaintext
142 lines
4.1 KiB
Plaintext
== price ticker
|
|
|
|
To demonstrate `channels` and other tools, we need a certain complexity.
|
|
For that purpose our project is a price ticker, that connects to
|
|
https://blockchain.info. A user can then connection to localhost
|
|
to query a given currency pair, like this:
|
|
|
|
[source,bash]
|
|
----
|
|
wscat -c localhost:8080/btc/usd
|
|
----
|
|
|
|
First we do the same declarations as echo-server.
|
|
|
|
.example/ticker.cpp declarations
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=decls]
|
|
----
|
|
|
|
The next step is to write a function to connect an ssl-stream,
|
|
to connect upstream:
|
|
|
|
.example/ticker.cpp connect
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=connect]
|
|
----
|
|
<1> Lookup the host
|
|
<2> Connect to the endpoint
|
|
<3> Do the ssl handshake
|
|
<4> Return the socket to the caller
|
|
|
|
Next, we'll need a function to do the websocket upgrade
|
|
on an existing ssl-stream.
|
|
|
|
.example/ticker.cpp connect_to_blockchain_info
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=ws_upgrade]
|
|
----
|
|
<1> `blockchain.info` requires this header to be set.
|
|
<2> Perform the websocket handshake.
|
|
|
|
Once the websocket is connected, we want to continuously receive json messages,
|
|
for which a generator is a good choice.
|
|
|
|
.example/ticker.cpp json_read
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=json_reader]
|
|
----
|
|
<1> Keep running as long as the socket is open
|
|
<2> Read a frame from the websocket
|
|
<3> Parse & `co_yield` it as an object.
|
|
|
|
This then needs to be connected to subscriber, for which we'll utilize channels to pass raw json.
|
|
To make life-time management easy, the subscriber will hold a `shared_ptr`, and the producer a `weak_ptr`.
|
|
|
|
.example/ticker.cpp subscription types
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=subscription_types]
|
|
----
|
|
|
|
The main function running the blockchain connector, operates on two inputs:
|
|
data coming from the websocket and a channel to handle new subscriptions.
|
|
|
|
.example/ticker.cpp run blockchain_info
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=run_blockchain_info]
|
|
----
|
|
<1> Initialize the connection
|
|
<2> Instantiate the json_reader
|
|
<3> Run as long as the websocket is open
|
|
<4> Select, i.e. wait for either a new json message or subscription
|
|
<5> When it's a json handle an update or a rejection
|
|
<6> Handle new subscription messages
|
|
|
|
The `handle_*` function's contents are not as important for the `cobalt` functionality,
|
|
so it's skipped in this tutorial.
|
|
|
|
The `handle_new_subscription` function sends a message to the `blockchain.info`,
|
|
which will send a confirmation or rejection back.
|
|
The `handle_rejection` and `handle_update` will take the json values
|
|
and forward them to the subscription channel.
|
|
|
|
On the consumer side, our server will just forward data to the client.
|
|
If the client inputs data, we'll close the websocket immediately.
|
|
We're using `as_tuple` to ignore potential errors.
|
|
|
|
.example/ticker.cpp read and close
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=read_and_close]
|
|
----
|
|
|
|
Next, we're running the session that the users sends
|
|
|
|
.example/ticker.cpp run_session
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=run_session]
|
|
----
|
|
<1> Read the http request, because we want the path
|
|
<2> Check the path, e.g. `/btc/usd`.
|
|
<3> Accept the websocket
|
|
<4> Start reading & close if the consumer sends something
|
|
<5> Create the channel to receive updates
|
|
<6> Send a subscription requests to `run_blockchain_info`
|
|
<7> While the channel & websocket are open, we're forwarding data.
|
|
<8> Close the socket & ignore the error
|
|
<9> Since the websocket is surely closed by now, wait for the read_and_close to close.
|
|
|
|
With `run_session` and `run_blockchain_info` written, we can not move on to main:
|
|
|
|
.example/ticker.cpp main
|
|
[example]
|
|
[source, cpp]
|
|
----
|
|
include::../../example/ticker.cpp[tag=main]
|
|
----
|
|
<1> Create the channel to manage subscriptions
|
|
<2> Use `join` to run both tasks in parallel.
|
|
<3> Use an cobalt scope to provide a `wait_group`.
|
|
<4> Run until cancelled.
|
|
<5> When we've reached the `limit` we wait for one task to complete.
|
|
<6> Wait for a new connection.
|
|
<7> Insert the session into the `wait_group`.
|
|
|
|
Main is using `join` because one task failing should cancel the other one.
|
|
|