Tutorial: Websocket Source (Coinbase API)
This tutorial builds a custom Source
connector named CoinbaseSource
for ingesting
ticker updates with the Coinbase Websocket API for processing them with a StreamingDataFrame
.
Specifically, it showcases how to use the Quix Streams connector framework to
create and use a customized Source
(there are also Sink
connectors as well!).
Outline of the Problem
We want to track of various Bitcoin prices for real-time analysis, but we need to get the data into Kafka first.
Our Example
This example showcases:
- Extending the Quix Streams
Source
class to read from the Coinbase Websocket API. - Using the new extension (
CoinbaseSource
).
Before Getting Started
-
You will see links scattered throughout this tutorial.
- Tutorial code links are marked >>> LIKE THIS <<< .
- All other links provided are completely optional.
- They are great ways to learn more about various concepts if you need it!
-
This tutorial uses a
Source
rather than a KafkaTopic
to ingest data.Source
connectors enable reading data from a non-Kafka origin (typically to get it into Kafka).- This approach circumvents users having to run a producer alongside the
Application
. - A
Source
is easily replaced with an actual Kafka topic (just pass aTopic
instead of aSource
).
Creating CoinbaseSource
First Let's take a detailed look at CoinbaseSource
in our >>> Coinbase Application <<<
to understand what modifications to Source
were necessary.
[!NOTE] Check out the custom Source docs for additional details around what can be adjusted.
Setting up Source.run()
A Source
requires defining a .run()
method, which should perform
a data retrieval and produce loop (using Source.serialize()
and Source.produce()
methods) within a
while Source.running
block.
Lets take a look at CoinbaseSource
's .run()
in detail.
Setting up the API Connection
First, we establish the connection.
ws_conn = connect(self._url)
subscribe_payload = {
"type": "subscribe",
"channels": [
{"name": "ticker", "product_ids": self._product_ids},
],
}
ws_conn.send(json.dumps(subscribe_payload))
Data retrieval loop
Now we set up the data retrieval loop contained within a while self.running
block.
This is so a shutdown from the Application
level also gracefully exits this loop; the
Source
essentially stops if the Source.run()
method is ever exited.
Note
Since no other teardown is required for websockets, nothing happens after the
while self.running
block.
Inside this block, records are retrieved, serialized (to JSON
), and produced to an
underlying internal topic as close to its raw form as possible (user-level manipulations
occur at the Application
level using a StreamingDataFrame
).
Tip
The internal topic can accept other data serializations by overriding
Source.default_topic()
.
Using CoinbaseSource
Now that CoinbaseSource
exists, we can ingest raw data from Coinbase.
Of course, each user will have their own desired product ID's and transformations to apply.
Now let's go over the main()
portion of
our >>> Coinbase Application <<< in detail!
Define the Source
First, set up a CoinBaseSource
with our desired product_ids
.
Be sure to provide a unique name since it affects the internal topic name.
coinbase_source = CoinbaseSource(
name="coinbase-source",
url="wss://ws-feed-public.sandbox.exchange.coinbase.com",
product_ids=["ETH-BTC"],
)
Create an Application
Create a Quix Streams Application, which is our constructor for everything!
We provide it our connection settings, consumer group (ideally unique per Application), and where the consumer group should start from on the (internal) Source topic.
Tip
Once you are more familiar with Kafka, we recommend learning more about auto_offset_reset.
Our Application
from quixstreams import Application
app = Application(
broker_address="localhost:9092", # your Kafka broker address here
auto_offset_reset="earliest",
)
Specify Topics
Application.topic()
returns Topic
objects which are used by StreamingDataFrame
.
Create one for each topic used by your Application
.
Note
Any missing topics will be automatically created for you upon running an Application
.
Our Topics
We have one output topic, named price_updates
:
The StreamingDataFrame (SDF)
Now for the fun part: building our StreamingDataFrame, often shorthanded to "SDF".
SDF allows manipulating the message value in a dataframe-like fashion using various operations.
After initializing with either a Topic
or Source
, we continue reassigning to the
same sdf
variable as we add operations.
Note
A few StreamingDataFrame
operations are
"in-place",
like .print()
.
Our SDF operations
First, we initialize our SDF with our coinbase_source
.
Then, our SDF prints each record to the console, and then produces only the price and ticker name to our outgoing topic.
sdf = app.dataframe(source=coinbase_source)
sdf.print()
sdf = sdf[['price', 'volume_24h']]
sdf.to_topic(price_updates_topic)
Example record
As an example, a record processed by our StreamingDataframe
would print the following:
{ 'value': { 'type': 'ticker',
'sequence': 754296790,
'product_id': 'ETH-BTC',
'price': '0.00005',
'open_24h': '0.00008',
'volume_24h': '322206074.45925051',
'low_24h': '0.00005',
'high_24h': '0.00041',
'volume_30d': '3131937035.46099349',
'best_bid': '0.00001',
'best_bid_size': '1000000000.00000000',
'best_ask': '0.00006',
'best_ask_size': '166668.66666667',
'side': 'sell',
'time': '2024-09-19T10:01:26.411029Z',
'trade_id': 28157206,
'last_size': '16666.86666667'}}
and then produce the following to topic price_updates
:
Running the Application
Running a Source
-based Application
requires calling Application.run()
within a
if __name__ == "__main__"
block.
Our Application Run Block
Our entire Application
(and all its spawned objects) resides within a
main()
function, executed as required:
This main()
setup is a personal choice: the only true
requirement is app.run()
being called inside a if __name__ == "__main__"
block.
Try it Yourself!
1. Run Kafka
First, have a running Kafka cluster.
To easily run a broker locally with Docker, just run this simple one-liner.
2. Download files
3. Install requirements
In your desired python environment, execute: pip install -r requirements.txt
4. Run the application
In your desired python environment, execute: python tutorial_app.py
.
5. Check out the results!
You should see record printouts like the example above.