back
May 24, 2023
|
Ecosystem

The drawbacks of ksqlDB in machine learning workflows

Using ksqlDB for real-time feature transformations isn't as easy as it looks. I revisit the strategy to democratize stream processing and examine what's still missing.

Animated rocket going down.

Python stream processing, simplified

Pure Python. No JVM. No wrappers. No cross-language debugging. Use streaming DataFrames and the whole Python ecosystem to build stream processing applications.

Python stream processing, simplified

Pure Python. No JVM. No wrappers. No cross-language debugging. Use streaming DataFrames and the whole Python ecosystem to build stream processing applications.

Data integration, simplified

Ingest, pre-process and load high volumes of data into any database, lake or warehouse, without overloading your systems or budgets.

Guide to the Event-Driven, Event Streaming Stack

Practical insights into event-driven technologies for developers and software architects
Get the guide
Quix is a performant, general-purpose processing framework for streaming data. Build real-time AI applications and analytics systems in fewer lines of code using DataFrames with stateful operators and run it anywhere Python is installed.

Introduction

Back in 2017, KSQL made its debut with the goal of opening up stream processing ‌to a wider audience. The idea was to simplify the learning curve for Apache Kafka by allowing data scientists to utilize traditional database concepts, shedding the need to juggle multiple mental models. As someone who shares the ambition of making Kafka more accessible, I found the promise of KSQL quite appealing.

As time went on, though, my enthusiasm for KSQL (which later became ksqlDB in 2019) began to waver. While doing market research for my own brainchild, Quix, I found that numerous people were discouraged by ksqlDB's limitations, especially when they tried to incorporate it into machine learning-based workflows.

So, before we delve into the heart of my evolving thoughts on ksqlDB, let's take a step back and quickly recap the fundamentals of how it operates.

How does ksqlDB work?

ksqlDB acts an abstraction layer that sits on top of Kafka Streams. It's designed to make Kafka Streams functionality more accessible to non-Java developers. This means that data scientists can create streaming transformations without having to get tangled up in the application code. The results of these transformations are streamed to new topics which are automatically created as part of a query.  Applications can then subscribe to these topics and consume the transformed data.

ksqlDB architecture

On the surface, ksqlDB’s deployment model is similar to Apache Flink in that it runs on its own dedicated cluster. This is in contrast to the architectural pattern for Kafka Streams which can be embedded as a library into any Java application.

The following diagram shows the basics of ksqlDB’s architecture:

Ksqldb architecture and components.
(Source “How it works” — ksqlDB documentation)

Aside from the CLI and user interface, there’s also a ksql-python library which is a wrapper around the ksqlDB REST API. You can see it in action in one of Kai Waehner’s Jupyter notebooks.

ksqlDB stream processing workflow

Data scientists can use ksql-python to create streams and processing logic that runs on the ksqlDB server. Stream transformations are then expressed in the form of SQL statements.

Here are some examples:

Stream Creation

You can use the ‘create_stream’ function to create streams like this:


from ksql import KSQLAPI
client = KSQLAPI('http://localhost:8088')
client.create_stream(table_name='user_logins',
                     columns_type=['user_id int', 'login_timestamp varchar',
                                   'ip_address varchar'],
                     topic='user_logins',
                     value_format='JSON')

This will read data from the Kafka topic ‘“user_logins” and ingest it into a table also called “user_logins”

Filtering a Stream

You can use an SQL query to filter the data from ‘user_logins’ and put the results in ‌a new topic.


select_query = """
    SELECT
        user_id,
        login_timestamp,
        ip_address
    FROM user_logins
    WHERE login_timestamp > '2022-01-01T00:00:00Z';
"""
# Create the filtered_user_logins stream using the create_stream_as method
ksql_client.create_stream_as(
    table_name='filtered_user_logins',
    select_columns=select_query,
    src_table='user_logins',
    kafka_topic='filtered_user_logins'
    value_format='JSON'
)

If the topic ‘filtered_user_logins’ doesn’t exist, it gets automatically created on first run. The table 'filtered_user_logins' is essentially a materialized view of the raw data and it can be queried in the same way as you would query a static table in a relational database.

Querying a materialized view

This is done with a so-called “pull query” and represents a snapshot of the current state at any given time. For example, if you had a table that aggregated total logins by IP address, a client application could execute a query like this:


query = client.query('SELECT * FROM logins_by_ip')
for item in query:
    print(item)

A data scientist could run pull queries in a Jupyter Notebook and do ad-hoc data exploration on a snapshot of the data.

Performing Complex Calculations with UDFs

If you need to perform specific data transformations or calculations that are not available in ksqlDB's built-in functions, you’ll need to create a UDF (User-Defined Function) which is a small piece of Java code that performs the function.

For example, imagine you are processing a stream of IoT sensor data, and you want to calculate the "heat index" based on temperature and humidity values. The heat index is a measure of how hot it feels to the human body when humidity is factored in with the actual air temperature. The heat index calculation is not a straightforward formula and is not available as a built-in function in ksqlDB.

To calculate the heat index in ksqlDB, you would create a custom UDF named `HEAT_INDEX` that takes temperature and humidity as input parameters and returns the calculated heat index value (e.g. using the Rothfusz regression or another suitable formula). When you’ve done that, you need to deploy the `HEAT_INDEX` UDF to your ksqlDB server. Bear in mind that this is only possible if you host ksqlDB yourself, but I’ll get into that later.

Right. That concludes my whirlwind tour of ksqlDB. Now let’s look at the drawbacks of this approach, especially from the perspective of ML engineers and data scientists.

The vision for ksqlDB as the glue in machine learning workflows

ksqlDB's role connects to a broader vision for Apache Kafka as a central system for machine learning workflows. The following diagram, from Confluent's article “Using Apache Kafka to Drive Cutting-Edge Machine Learning” captures this vision:

Architecture for machine learning in mission critical real-time applications.

Field CTO of Confluent, Kai Waehner, recognized ksqlDB's potential to bridge the gap between data scientists, who use Python and SQL, and software engineers, who use Java. His demos and content after ksqlDB's launch, such as “Machine Learning With Python, Jupyter, KSQL, and TensorFlow”, demonstrate why he thought ksqlDB was the missing link for machine learning workflows.

“... KSQL can feel Python-native with the ksql-python library, but why use KSQL instead of or in addition to your well-known and favorite Python libraries for analyzing and processing data? The key difference is that these KSQL queries can also be deployed in production afterwards.” (bolding mine)

This goal, letting data scientists move from prototype to production with minimal code changes, is commendable. Yet, ksqlDB hasn't fully realized this vision in practice. Nevertheless, I believe the core idea behind ksqlDB still holds promise, but its weaknesses must be addressed first.

What are the drawbacks of ksqlDB when it comes to ML workflows?

In practice, ksqlDB isn't well-suited for the complex transformations needed for feature engineering. Instead of reducing the gap between data scientists and software engineers, it reinforces it. To understand why, let’s walk through an example pipeline. The following illustration of depicts a real architecture that uses ksqlDB:

ksqlDB mismatch scheme.

It’s representative of typical architectures we’re seeing in real-time applications (such as food delivery and micro-mobility apps). In these architectures, engineers aim to enhance the responsiveness of machine learning (ML) estimates through real-time feature engineering and machine learning.

Here's a summary of the process:

1) Raw data is ingested from the event-driven application into Kafka and persisted to BigQuery for offline feature engineering and training machine learning models.

2) When the data engineering and data science is done, the feature code and the model artifacts are deployed to a real-time production environment.

3) The feature calculations are implemented in ksqlDB and applied to incoming data streamed from mobile app users (via Apache Kafka), then written to an online feature store (a Redis Cache).

4) The app requests a prediction from the machine learning model which is deployed as an API. The model queries the feature store to get the freshest feature data and serves results back to the app.

However, there are two lines of friction, making it difficult to transfer data and code. This friction is caused by two technical mismatches:

  • Mistmatch 1: different development and production environments.

    This friction typically presents itself in three ways:

    1) When feature code has to be re-engineered from Python or BigQuery-SQL (depending on whether the features were developed directly in BigQuery or in Jupyter Notebooks) to  KsqlDB’s SQL syntax.

    2) When engineering the ML model API to query Redis for the right data at the right time.

    3) When testing and debugging the production pipeline, especially when comparing production results with results derived in the offline development environment.
  • Mistmatch 2: different architectural patterns in production.

    Data is flowing from left to right in an event-driven pattern (a typical design of modern apps) while the machine learning model is served behind a REST API that is calling a Redis feature store (a typical pattern in modern ML systems).

    Friction in this hybrid architecture can manifest as:

    1) Lower model accuracy if feature data from the product does not match the model at request time.

    2) Increased model retries and time-outs from waiting for feature data.

    3) Poor user experience from slow model request/response cycles.

Clearly not all of these drawbacks are directly related to ksqlDB. However, that first mismatch, is partially related to a number of weaknesses in how ksqlDB works.

Let’s run through them one by one:

Data scientists need to work in a mishmash of SQL and Python

This might not seem like a big deal since many data scientists know both languages. But applications built on machine learning models need to use languages that support proper control flow with logic—and SQL is not a true programming language, it’s a declarative language.

Certainly, ksqlDB is by no means the only SQL-based stream processing framework—there’s also Flink and Spark. Yet, they all have lower-level APIs for software engineers to use in their Java applications. This reinforces the separation of concerns between engineers and data scientists.

One of Kai Waehner’s slides on ksqlDB summarizes this dilemma nicely:

Enlarge sql skills slide.

The benefit of ksqlDB was that data scientists could code exclusively ‌in Python and pipe the results of queries into Pandas (or any other library that’s popular in the ML and data science communities).

Yet anyone who has had to code a mix of SQL and Python quickly discovers that SQL statements themselves suffer from a lack of linting and code completion and, in some cases, limited syntax highlighting.

Also, for complex data processing tasks, pure Python and libraries such as Pandas are much more flexible than SQL—so there’s really little need to use it. I don’t want to get sidetracked by the wider “Pandas vs SQL debate”, but I do recommend the 4-part series Pandas vs. SQL published by Ponder (a data science tool that helps you scale Pandas processing tasks) which goes into precise detail on why Pandas is far more suited for complex data processing tasks.

Data scientists are still dependent on Java developers for custom logic

After talking to developers in the ML community, there are many tasks that ksqlDB simply cannot do. To fill these gaps, they often need to go back to Java—either in the form of UDFs or Kafka Streams applications.

The problem with UDFs

Remember the earlier example I provided with the heat index calculation? It turns out that this level of complexity is pretty common in machine learning workflows. ML engineers rely heavily on UDFs which makes things complicated. There needs to be an extra set of processes involved in creating and managing UDFs in parallel to the work required to manage the pure-SQL transformations.

Moreover, due to differences in skills, UDFs and SQL queries are often maintained by separate teams. This is related to another major issue related to UDFs—namely, that Confluent Cloud’s hosted version of ksqlDB doesn’t support them. Thus, if you really need UDFs (which many ML teams do) you’ll need to run ksqlDB in your own cluster. This means that there’s often one team maintaining the cluster and deploying UDFs and another managing the SQL-based processing logic. In these instances, ksqlDB sadly does not help to reduce the impedance gap.

Custom Session Windows

There are other problems that even UDFs can’t solve. One example is custom window operations, which again, aren’t as rare as you’d think.

For instance, let's say you want to build a custom session window aggregator for a food delivery app.

  • You may want to define a session window based on user interactions where a session is considered active only if a user completes at least two actions (e.g. adding items to the cart and placing an order) within a custom time window of 45 minutes.
  • In ksqlDB, the built-in session window functionality allows you to set a gap duration; however, it doesn't provide an easy way to incorporate the additional condition of requiring a minimum number of user actions within the time window.

Custom state stores

A related problem is how ksqlDB handles stateful operations. ksqlDB provides built-in stateful stream processing capabilities, such as windowed aggregations, joins, and session windows, which use RocksDB as the default state store internally. However, ksqlDB does not expose a direct way to define or interact with custom state stores, as it abstracts away many low-level details to simplify the stream processing experience. For example, it does not support complex data structures such as conflict-free replicated data types (CRDTs), which help to ensure data consistency when there are many competing operations such as multiple edits to a document or bids for a certain stock.

In both of these scenarios which require custom logic, you would have to build and deploy a small Kafka Streams application. This again means that data scientists need to turn to software engineers for help.

So far, we’ve looked at the drawbacks from the perspective of a data scientist. Yet, even if you’re an engineer who is comfortable with Java, using ksqlDB with Apache Kafka (as opposed to Flink or Spark) can pose some extra challenges.

Architectural Drawbacks

Since ksqlDB uses Kafka Streams as its underlying technology, it also inherits Kafka Streams limitations. Back when ksqlDB was originally announced, Jesse Anderson (managing director of the Big Data Institute) highlighted these with some convincing arguments.

Here’s a summary of Anderson's two key points:

  • Kafka Streams does not have proper checkpointing
    To recover safely from an outage or crash, you need to have proper checkpointing in place. However, Kafka Streams does not have this, which can lead to hours of downtime as it tries to recover from where it left off. This is because it must replay all the state mutation messages from the beginning to reconstruct the state. This process can be time-consuming, especially if there are a large number of keys and state changes. The more state changes that have occurred, the longer it will take to replay and rebuild the state in its local state store (RocksDB), which can cause extended downtime.
  • Shuffle sorting slows down when you change keys
    Shuffle sorting helps ensure that data that belongs together is processed on the same partition. It’s achieved by grouping the streamed data by a certain key. However, Kafka Streams tends to struggle when you decide that you want to group by a different key. This is because Kafka Streams handles shuffle sort differently than Flink or Spark Streaming, by creating a new internal repartitioning topic for key changes (for example changing “tracking_ID” to “deviceID”). This approach can lead to increased load and data on Kafka brokers, potentially causing performance issues or even cluster failures if users are not aware of the internal repartitioning topic creation. By Anderson’s estimation, this would slow a shuffle by up to 50% of its normal speed.

Limitations for ksqlDB in Confluent Cloud

Running ksqlDB yourself can be a hassle because you have to manage the cluster yourself. Instead, many teams prefer to run ksqlDB in Confluent Cloud. However, running ksqlDB in Confluent Cloud also introduces more limitations. As previously mentioned, you can’t use UDFs at all, but there are also other limitations. The main ones being such as the number of persistent queries you’re allowed to have (20 per cluster) and the number of clusters per environment (max 10). This is an important consideration for scaling your feature processing pipeline. The number of persistent queries can quickly add up ‌since feature transformations often require multiple steps, with each step being handled by a separate query.

Despite its drawbacks, ksqlDB is a step in the right direction

Although I’ve spent most of my time pointing out the drawbacks of ksqlDB, there’s clearly a need for a product like it. Confluent saw that data folks often struggled with Apache Kafka and stream processing paradigms and attempted to build a tool that was more amenable a Data Scientist’s way of working. They were right to try and open up Kafka to Python users.

However, I don’t think the solution is to build a Python wrapper around an existing Java framework. Data Scientists and engineers need a tool that is written in Python from the ground up, otherwise there’s always going to be a disconnect between development and production. There are a few pure Python tools that are attempting to fill this gap with, such as Faust and Bytewax, but there is still more work to be done before this ‌impedance gap is bridged properly.

To really understand where impedance gaps are, and how we can fill them, it helps to take a closer look at all the stages of an ML workflow, which is exactly what I do in my companion article “United we stream: easing collaboration between data scientists and engineers for real-time pipelines”.

What’s a Rich Text element?

The rich text element allows you to create and format headings, paragraphs, blockquotes, images, and video all in one place instead of having to add and format them individually. Just double-click and easily create content.

Static and dynamic content editing

A rich text element can be used with static or dynamic content. For static content, just drop it into any page and begin editing. For dynamic content, add a rich text field to any collection and then connect a rich text element to that field in the settings panel. Voila!

How to customize formatting for each rich text

Headings, paragraphs, blockquotes, figures, images, and figure captions can all be styled after a class is added to the rich text element using the "When inside of" nested selector system.

Related content

Featured image for the "How to fix common issues when using Spark Structured Streaming with PySpark and Kafka" article published on the Quix blog
Ecosystem

How to fix common issues when using Spark Structured Streaming with PySpark and Kafka

A look at five common issues you might face when working with Structured Streaming, PySpark, and Kafka, along with practical steps to help you overcome them.
Steve Rosam
Words by
Featured image for the "Quix Streams, a reliable Faust alternative for Python stream processing " article published on the Quix blog
Ecosystem

Quix Streams—a reliable Faust alternative for Python stream processing

A detailed comparison between Faust and Quix Streams covering criteria like performance, coding experience, features, integrations, and product maturity.
Steve Rosam
Words by
The logos of Flink and Python
Ecosystem

Debugging PyFlink import issues

Solutions to a common issue that Python developers face when setting up PyFlink to handle real-time data.
Steve Rosam
Words by