back
February 8, 2023
|
Tutorials

Build a simple event-driven system to get ML predictions with Python and Apache Kafka

Use the Quix Streams Python library to continuously stream email records from a CSV file, get an ML inference for each record, then stream the results back to a new Kafka topic.

ML predictions with Python and Apache Kafka black background.
Quix Streams is a fast and general-purpose processing framework for streaming data. Build real-time applications and analytics systems on data streams using Python DataFrames and stateful operators, all without having to install a server-side engine.

About this tutorial

If you’ve ever interacted with ML models in the cloud, you might notice that it’s often done through a REST API (the GPT-3 API is currently a popular example). However, a REST API usually isn’t the best choice when working with high-volumes or continuous streams of data. Most of the time, it’s better to use an event-based system such as Apache Kafka. We’ll look at the theory behind this in a moment, but you’ll understand it faster if you try it out for yourself. We’ve created a simple tutorial to show you how to generate real-time ML predictions using Apache Kafka to manage the flow of data.

What you’ll learn

  • How to stream raw data to Kafka from a CSV
  • How to read the data from Kafka and send it to an ML model for inference
  • How to output the inference results back into Kafka for downstream processing

What you should know already

This article is intended for data scientists and engineers, so we’re assuming the following things about you:

You know your way around Python and have worked with the Pandas library

You’ve heard of Apache Kafka and know roughly what it’s for

But don't worry if you don’t meet these criteria. You can always refer to the Kafka documentation for more details on any concepts or features. Plus, this tutorial is simple enough to follow along, and we’ll briefly explain these technologies. Just be aware that it’s not intended for absolute beginners.

What You’ll be building

In effect, you’ll create a system on your local machine that resembles the following architecture:  

Kafka ML and mail architecture scheme.

In this architecture, an email server is interacting with a machine learning server to get spam predictions. Kafka is in the middle managing the data flow.

The email server sends a stream of emails to the ML server via Kafka

For each email, the ML server returns a prediction about whether the email is spam or “ham” (a normal email)

It sends the predictions back to the email server, also via Kafka

The mail server can then update the spam label in its email database

Now, if you want to get straight to the tutorial you can skip the preamble, but we thought it might be useful to explain why we think Kafka is a great tool for ML and Python development.

Why Use Kafka to orchestrate data for ML predictions?

Because it’s generally more efficient and fault-tolerant. Pipelines like the one in our example usually have multiple processing steps that run at different speeds. This means that the underlying system needs to be efficient at queuing the data that is waiting for the next step. If one part of the pipeline is “clogged”, the whole system should be able to adapt gracefully. This is a challenge that Kafka handles much better than a system based on REST APIs

For example, suppose that your ML server chokes because there was a memory leak. What would happen?

How a REST-based system would react

In a REST-based system, the mail server will keep calling the ML server, asking “Hey, is this email spam or what?.. you there? 
.please answer!”. But the mail server will keep getting the equivalent of an engaged dial tone. This ties up the mail server’s resources because it must keep trying until it gets an answer. For each email, it tries for a while, gives up, then moves on the next email. This means that many spam emails will slip through unfiltered until the ML service is back online.

How a Kafka-based system would react

Kafka, on the other hand, patiently stores all the emails it has received, accumulating a backlog of work for the dormant ML server. As soon as the ML server comes back online, the server has a stack of unfinished paperwork on its desk (metaphorically speaking). It must give predictions for the backlog of unprocessed emails before it can start working on the latest ones. And as far as the mail server is concerned, it’s business as usual. It waits for the ML server to hand over its predictions rather than having to repeatedly ask for them. Whenever it receives a new prediction, it updates its database. Granted, the backlog means that it may take longer for an email to be marked as spam than usual, but it’s better than spam not being identified at all. Kafka’s scalable architecture also enables you to give more resources to your ML service and get through the backlog faster.

Why use Python with Apache Kafka?

Because data and ML teams are generally more proficient in Python. They sometimes struggle with Kafka because most older Kafka tutorials are written for software engineers who write in Java. This is because software engineers have traditionally built the components that interact with Kafka (and Kafka itself is written in Java/Scala).

Kafka’s user base is changing however. The responsibilities of software and data teams are beginning to converge. This has been driven by the growing importance of data in modern organizations and the increasing complexity of data management and processing tasks. Today, data professionals also contribute to software components that interact with Kafka—but they still encounter a lot of friction because Java is generally not part of their skillset. That’s why we’re using Python in these examples.

Ok, that’s it for the preamble - let's get into the tutorial.

Prerequisites for this tutorial

You'll need about 30 minutes to complete the steps (once you have the required software installed).

Make sure that you have the following software installed before you proceed any further.

| **SOFTWARE** | **NOTE** | |---|---| | Windows or a Linux-based operating system | We’ll be providing commands for both Windows and Unix-based operating systems such as macOS and Ubuntu. | | Python 3.0+ and required libraries |

You can download the installer from the Python downloads page.

Optionally, you might want to create a virtual environment to use for this tutorial so that you avoid conflicting dependencies. 

Required Libraries

  • Pandas: Install with pip3 install pandas
  • Quix Streams: Install with pip3 install quixstreams
  • Tensorflow: Install with pip3 install tensorflow
    If you’re using a Macbook Pro with an M1 chip, you might need to install TensorFlow using the steps outlined in this guide.
  • python-tabulate: Install with pip3 install tabulate
    (*technically, this last one isn't essential - it just lets you pretty-print DataFrames using df.to_markdown(), but it makes reviewing the console output a lot easier)
| | Java 8+ |

This is a prerequisite for Apache Kafka. To install it, choose one of the following:


 

| | Apache Kafka |

You can download the Apache Kafka binary from the Apache Kafka Download page.

Extract the contents of the file to a convenient location. For example, you can create a project folder for this tutorial and extract it there.

|

Major steps

Before we get into the details, let’s go over the major steps that we’ll be covering.

  1. Setting up Kafka: We’ll first get to grips with Kafka’s command line tools, and use them to:
    — Start the Zookeeper and Kafka server.
    — Create a topic (where we’ll be sending our data).

  2. Sending the raw email data to a kafka topic for downstream analysis:  Use the Quix Streams Python library to:
    — Read the csv into a DataFrame and initialize the kafka python producer.
    — Iterate through the rows and send to Kafka.

    We’ll also look at the benefits of using data frames and time series data with Kafka.

  3. Reading each email from a Kafka topic using an ML Model to determine if it’s spam:  You’ll be using the Quix Streams Python library to chain together a consumer and producer and:
    — Load a pre-trained model with Keras and TensorFlow
    — Read the messages from our “emails” Kafka topic, and output a spam prediction

  4. Writing the spam predictions to another Kafka topic For downstread consumption: You’ll write the predictions to a “predictions” topic and look at how downstream processes could use that information.

If you want to jump straight ahead to the code, you can clone the tutorials repo in GitHub with the following command:


git clone https://github.com/quixio/tutorial-code

You'll find the code in the 'ml-predictions-kafka' subfolder.

Using Kafka to orchestrate data for ML predictions

Ok, so you’ve installed the prerequisites, know the steps, and are ready to go—let’s get this done!

Setting up Kafka

Before you can do anything with Kafka, you have to start the core services. Each of these services needs to run in a separate terminal window.

When following these instructions, start each terminal window in the directory where you extracted Kafka (for example “C:\Users\demo\Downloads\kafka_2.13-3.3.1\”)

1. Start up your Kafka servers:

— In the Kafka directory, open a terminal window and enter the following command.

  • Linux / macOS

bin/zookeeper-server-start.sh config/zookeeper.properties

  • Windows

.\bin\windows\zookeeper-server-start.bat.\config\zookeeper.properties


You should see a bunch of log messages indicating the server started successfully. Leave the window open.


— Open a second terminal window and enter the following command:

  • Linux / macOS

bin/zookeeper-server-start.sh config/server.properties

  • Windows

.\bin\windows\kafka-server-start.bat .\config\server.properties


Again, you should see a bunch of log messages indicating the server started successfully and connected to the Zookeeper server. Also leave this window open.

Downloading the data

For this exercise, you’ll be using an adapted version of this sample email data set from Kaggle that was designed for testing spam detection algorithms. I've adapted it so that it more closely resembles real-life time series data, with a timestamp and a unique identifier as well as the message body.
Here's a sample of the first few rows.

| **Timestamp** | **ID** | **Message** | |----------------------------|--------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 2019-01-23 12:00:00 | 62735 | Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat... | | 2019-01-23 12:01:26 | 62736 | Ok lar... Joking wif u oni... | | 2019-01-23 12:02:53 | 62737 | Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's | | 2019-01-23 12:04:19 | 62738 | U dun say so early hor... U c already then say... |


To prepare the data, follow these steps:

  • Create a folder for this project (e.g. “kafka-ml-tutorial”) and download the “spam-timeseries.csv” file from our GitHub repo into the project folder.
  • If you’re on a Linux-based OS, you can perform these steps with the following commands:

mkdir kafka-ml-tutorial
cd kafka-ml-tutorial
curl raw.githubusercontent.com/quixai/tutorial-code/main/ml-predictions-kafka/spam-timeseries.csv -o spam-timeseries.csv

Creating a Kafka producer to send the email text

Now, start sending data to Kafka. You’ll be using the recently-released Quix Streams Python library which is one of several libraries intended to connect python applications to Kafka (others are kafka-python and confluent-kafka-python).  I've chosen the Quix Streams library because it specializes in time-series data and has built-in support for Pandas DataFrames.

To create the Kafka producer, follow these steps:

1. Create a file called producer.py in your project directory and insert the following code block:


# importing packages
import pandas as pd
import time
import datetime as dt
import quixstreams as qx

# Initialize the Quix Streams client
client = qx.KafkaStreamingClient('127.0.0.1:9092')

# Initialize the destination topic
print("Initializing topic")
topic_producer = client.get_topic_producer('emails')
output_stream = topic_producer.create_stream()

print(f'Initialized Quix Streams client at {dt.datetime.utcnow()}')

# Read in the CSV file
df = pd.read_csv("spam-timeseries.csv")

  • This imports all the required libraries and initializes the Quix Streams, telling it to connect to the server that you should (hopefully) still have running on your computer under ‘localhost’.
  • It also creates a Kafka producer to write to a new topic called ‘emails’ (fortunately, you don’t have to create this topic in advance, the library will create it for you).
  • Finally, it reads in the CSV file as a DataFrame.

2. Add the ‘for’ loop that will iterate through the file and send each row to the producer for serialization and stream it into the topic “emails”.


for i in range(len(df)):

    # Create small data frame for each message
    df_r = df.iloc[[i]]

    # Print the message so you can see what is being sent
    print("Sending Message: \n", df_r.to_markdown())

    # Send the data with the Quix Streams client
    output_stream.timeseries.publish(df_r)

    # Optionally wait for half a second to slow down the stream
    # so that we can see what is happening.
    time.sleep(0.5)

3. Run producer.py with the following command:


python3 producer.py

For each message, you should see a DataFrame printed to the console in markdown format like this:

Sending: | | Timestamp | ID | Message | |-----:|:--------------------|-------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------| | 5566 | 2019-01-29 01:35:02 | 193218268301 | REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode | Sending: | | Timestamp | ID | Message | |-----:|:--------------------|-------------:|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------| | 5567 | 2019-01-29 01:36:29 | 193218268302 | This is the 2nd time we have tried 2 contact u. U have won the ÂŁ750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate. |

Loading the Model

Next, you'll use a pretrained model to collect the preprocessed data from the emails Kafka topic and create predictions with it. This model can be stored in a cloud bucket or alternate data store; however, since you are working locally, you can store it in your project directory.

To prepare, follow these steps:

Download the model archive file from this storage location and extract it into your project folder.

If you’re on a Linux-based OS, you can use a command such as the following:


curl  -o spam-model.tar.gz
tar -xzf spam-model.tar.gz -C /kafka-ml-tutorial/model

To load the model:

  • In your project directory, create a file called predictions.py and insert the following code block:

from pandas import DataFrame
import time, json
import datetime as dt
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import numpy as np
import quixstreams as qx
import tensorflow_text as text

model = keras.models.load_model('model')

This imports all the required libraries, including the TensorFlow components which you’ll use to load the model into and interact with it. The first line then loads the model files into memory from the “model” subdirectory.

Creating a consumer and producer to manage data flow

In the same file, continue by adding the Kafka-related functions.

To read the email data and output spam predictions, follow these steps:

1.  Add the code that initializes a consumer and another producer.

— Insert the following code into the predictions.py file, underneath the line that loads the model.


#1 — Initialize the Quix Streams client (for standalone Kafka)
client = qx.KafkaStreamingClient('127.0.0.1:9092')

#2 — Initialize a Quix Streams consumer to read from the emails topic (with some extra commit settings)
print("Initializing consumer...")
commit_settings = qx.CommitOptions()
commit_settings.auto_commit_enabled = False # Make sure we can read the same messages again (for testing)
topic_consumer = client.get_topic_consumer("emails", commit_settings=commit_settings,auto_offset_reset=qx.AutoOffsetReset.Earliest)

#2 — Initialize a Quix Streams producer for sending predictions to the predictions topic
print("Initializing producer...")
topic_producer = client.get_topic_producer('predictions')
output_stream = topic_producer.create_stream()

print(f'Initialized Kafka producer at {dt.datetime.utcnow()})

For the consumer, we’ve added some settings to make testing easier:

  • AutoOffsetReset.Earliest—ensures that the consumer keeps reading from the earliest unread message in the topic.
  • enable_auto_commit=False—ensures that the consumer keeps reading the same messages, regardless of whether it has already seen them.

2. Add the ‘for’ loop that will iterate through all the messages from the “emails” topic:

— Insert the following code into the predictions.py file (underneath the producer code) to get a spam prediction for each email and send the prediction to a “predictions” topic.


def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
    print("Processing Message: \n", df.to_markdown(), "\n")

    # Extract the email text from the DataFrame (in a format that the model expects)
    dff = pd.Series(df['Message'])

    # Get a spam prediction for the email text
    inference = (model.predict(dff) > 0.5).astype(int)

    # Create a new message with the MessageID and the SpamFlag
    df["Spamflag"] = inference

    # In Quix Streams, a message always needs a timestamp
    # so we an extra one for the time of the prediction
    df["TimestampInference"] = time.time_ns() // 1_000_000

    # Create a new DataFrame without the email text to reduce data transmission
    df_m = df[['TimestampInference','ID', 'Spamflag']]

    # Publish the spam predictions to the predictions topic
    output_stream.timeseries.publish(df_m)
    print("Prediction sent: \n", df_m.to_markdown(), "\n\n\n")

def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
    # Subscribe to new DataFrames being received
    stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler


# Subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler

print("Listening to streams. Press CTRL-C to exit.")

# Handle termination signals and provide a graceful exit
qx.App.run()

3. Now it’s time to test the code—run the predictions.py file with the following command:


python3 predictions.py

You should see console output that resembles the following example:

Processing Message: | | timestamp | ID | Message | |---:|--------------------:|------------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| | 0 | 1548246989000000000 | 1.93218e+11 | Urgent UR awarded a complimentary trip to EuroDisinc Trav, Aco&Entry41 Or £1000. To claim txt DIS to 87121 18+6*£1.50(moreFrmMob. ShrAcomOrSglSuplt)10, LS1 3AJ | 1/1 [==============================] - 0s 136ms/step Prediction sent: | | ID | Spamflag | |---:|------------:|-----------:| | 0 | 1.93218e+11 | 1 | Processing Message: | | timestamp | ID | Message | |---:|--------------------:|------------:|:-------------------------------------------------------------------------------| | 0 | 1548247075000000000 | 1.93218e+11 | Did you hear about the new "Divorce Barbie"? It comes with all of Ken's stuff! | 1/1 [==============================] - 0s 144ms/step Prediction sent: | | ID | Spamflag | |---:|------------:|-----------:| | 0 | 1.93218e+11 | 0 | Processing Message: | | timestamp | ID | Message | |---:|--------------------:|------------:|:-----------------------------------| | 0 | 1548247162000000000 | 1.93218e+11 | I plane to give on this month end. | 1/1 [==============================] - 0s 152ms/step 1/1 [==============================] - 0s 152ms/step Prediction sent: | | ID | Spamflag | |---:|------------:|-----------:| | 0 | 1.93218e+11 | 0 | You’ll see some messages flagged as 1 (spam

You’ll see some messages labeled as 1 (spam) and others as 0 (ham).

As the predictions component is subscribed to the emails topic, it will keep making predictions until it has run through all the available emails. If you keep sending messages to the emails topic, this process can run continuously.

That’s it for now. Kudos for making it this far!

If you’re feeling extra adventurous, you could create an extra consumer.py file and write a consumer to read the spam predictions back into a Data Frame (by adapting the consumer code from this step) and finally into a new CSV.

Now, let’s go through what you did and how it might scale in production.

Wrapping up

First, let's revisit the production architecture that we illustrated at the beginning of this article.

ML workflow final.

In this tutorial you essentially:

  • Used a CSV to replicate a mail server and streamed emails to Kafka with the Quix Streams library.
  • Ran a Kafka server on your local machine
  • Ran a ML model on your machine to imitate an ML service and stream predictions back into another Kafka topic.

Of course, I should qualify that this local demo isn't truly "event-driven". The emails aren't arriving in a realistic fashion so we can't think of each incoming Kafka message as an "email received" event. But it gets you close enough.

How it could scale:

The mail server would of course be running on its own bare-metal or virtual server, and would be streaming mails continuously, perhaps in batches rather than one message at a time.

Kafka itself might be running in a virtual private cloud so that it can scale up and down as needed.

The ML service might be running in a consumer group in multiple replicas that it can also automatically scale up during spikes in demand.

Given all of this horizontal scaling, you would also need some extra Kafka configuration so that messages are processed in an optimal manner, using the full breadth of these horizontally scaled resources.

But that’s enough theory for now. Congratulations on making your first steps towards mastering Apache Kafka with Python. By following the steps outlined in this article, you should now have a good understanding of how to use Kafka to stream data from various sources, process that data, and pipe it through a machine learning model.

Remember, the key to successfully using Kafka for machine learning is to carefully plan your data pipeline and to test and monitor your system as you go. With practice and patience, you will become more and more proficient at using Kafka to power your machine learning projects.

If you have any questions about the tutorial, don't hesitate to reach out to me in The Stream—our open Slack community for real-time data enthusiasts.

What’s a Rich Text element?

The rich text element allows you to create and format headings, paragraphs, blockquotes, images, and video all in one place instead of having to add and format them individually. Just double-click and easily create content.

Static and dynamic content editing

A rich text element can be used with static or dynamic content. For static content, just drop it into any page and begin editing. For dynamic content, add a rich text field to any collection and then connect a rich text element to that field in the settings panel. Voila!

How to customize formatting for each rich text

Headings, paragraphs, blockquotes, figures, images, and figure captions can all be styled after a class is added to the rich text element using the "When inside of" nested selector system.

Related content

Pipeline diagram for data enrichment pipeline
Tutorials

How to enrich a stream of data in real time with Quix and Redis

Learn how to enrich real-time sensor data streams by looking up device coordinates in Redis and appending them to the data stream using Quix.
Steve Rosam
Words by
A data streaming pipeline for creating a heat map. There is an Angular logo next to both frontend applications
Tutorials

Clickstream analytics: creating a user interaction heat map for an e-commerce website

See Quix Streams in action by vizualizing mouse movement patterns in real-time using hopping windows. A Python data streaming tutorial for web analytics.
Jack Murphy
Words by
Banner image for the blog article "Get started in minutes with the Hello Quix template"
Tutorials

Continuously ingest documents into a vector store using Quix, Qdrant, and Apache Kafka

Learn how to set up a decoupled, event-driven pipeline to embed and ingest new content into a vector store as soon as it's published.
Merlin Carter
Words by