Products

Solutions

Resources

Developers

Company

Getting Started with PySpark, Kafka, SQL, and AI

Getting Started with PySpark, Kafka, SQL, and AI

How to run SQL queries over Kafka in seconds? With PySpark, we get a system where we can run ad-hoc analysis locally and any SQL queries to explore our data quickly.

Stéphane Derosiaux

17 sept. 2023

Title

Title

Title

PySpark is a Python interface to write Apache Spark applications to use it in command line. The Apache Spark platform is built to crunch big datasets in a distributed way.

Combining PySpark with Kafka, we get a system that can process real-time data from Kafka in seconds using PySpark commands. Running ad-hoc analysis locally and various SQL queries to explore our data quickly is particularly useful.

Let's see how to:

  • Get started with PySpark and Kafka

  • Do filtering and transformations using SQL in pyspark

  • Do filtering and SQL transformations with Conduktor

  • Finish with some madness using OpenAI/ChatGPT to generate random JSON into Kafka!

Fasten your seatbelt, we're going to explore a lot of things!

Install PySpark

If you're running on macOS, use Homebrew to install PySpark. Check Apache Spark website for other systems or installation methods.

$ brew install apache-spark

This will install a ton of things! Spark is a big project, with many components and relies on the JVM and the Scala programming language at its core. We're only interested in the pyspark component here, which is the Python CLI for Spark.

pyspark should be available in your terminal (open a new one otherwise):

$ pyspark
Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/14 00:13:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/

Using Python version 3.11.5 (main, Aug 24 2023 15:09:45)
Spark context Web UI available at http://stephane.lan:4040
Spark context available as 'sc' (master = local[*], app id = local-1694643195574).
SparkSession available as 'spark'.
>>> ▓

Wonderful, it works! We have a local Spark instance running, powered by Python, with a local Spark UI available at http://localhost:4040. We'll do some fancy things with this UI later in this article, feel free to explore.

Let's familiarize ourselves with the pyspark API first by doing some basic operations, like creating a dataset and printing its schema:

>>> data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
>>> df = spark.createDataFrame(data)
df.show()
>>> df.show()
+------+------+
|    _1|    _2|
+------+------+
|  Java| 20000|
|Python|100000|
| Scala|  3000|
+------+------+
>>> df.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)

A DataFrame (df for short) is a Spark concept. It's like a table in a spreadsheet. It has rows and columns, and each column has a name. Each row is a record that contains data for each column. For example, in a table storing information about books, a row could contain data about a single book: its title, author, and price.

In our test above, we created a DataFrame with 3 rows and 2 unnamed columns.

Just like you can sort, filter, or perform calculations on a table in a spreadsheet program, you can do similar operations on a DataFrame in a programming environment. The main difference is that DataFrames are designed to handle large (millions, billions) volumes of data efficiently, even data that won't fit into your computer's memory.

That's exactly why Kafka and Spark are a great match! 🚀

Using Spark to read data from Kafka

We can connect our pyspark to Kafka to read data.

First, we need a Kafka! If we don't have one already running, we can use Upstash to have a free Kafka cluster in the cloud. Conduktor will be used to create topics and produce data, and pyspark to do some transformations.

Follow this guide to get your free Kafka cluster up and running in a few minutes: Getting Started with Conduktor and Upstash.

If you don't have installed Conduktor yet, install it and add the configuration of your Upstash cluster.

$ curl -L https://releases.conduktor.io/console -o docker-compose.yml && docker compose up

Then we create a topic "hello" and produce some data into it using our Kafka UI:

All good from a Kafka and Conduktor perspective. Let's go back to Spark.

We start pyspark with the Kafka dependency to be able to read from Kafka:

# If we don't add the spark-sql-kafka package, we will run into this error:
# pyspark.errors.exceptions.captured.AnalysisException: Failed to find data source: kafka.

$ pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1

Then, we use pyspark to read from our topic "hello" for 10 seconds. In the code below, replace the XXX with our own credentials from Upstash for the kafka.sasl.jaas.config property. ⚠️

Notice the important "startingOffsets": "earliest" option: it tells Spark to start from the beginning of the topic, and not from the end (which is the default behavior).

Copy this whole block into pyspark (don't forget to update with your own credentials):

from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import time

# We will keep the consumer running for 10 seconds
stop_time = datetime.now() + timedelta(seconds=10)  # 10 seconds from now

##
## TODO TO UPDATE kafka.sasl.jaas.config
##
kafka_options = {
    "kafka.bootstrap.servers": "romantic-drake-10214-eu1-kafka.upstash.io:9092",
    "kafka.sasl.mechanism": "SCRAM-SHA-256",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": """org.apache.kafka.common.security.scram.ScramLoginModule required username="XXX" password="YYY";""",
    "startingOffsets": "earliest", # Start from the beginning when we consume from kafka
    "subscribe": "hello"           # Our topic name
}

# Subscribe to Kafka topic "hello"
df = spark.readStream.format("kafka").options(**kafka_options).load()

# Deserialize the value from Kafka as a String for now
deserialized_df = df.selectExpr("CAST(value AS STRING)")

# Query Kafka and wait 10sec before stopping pyspark
query = deserialized_df.writeStream.outputMode("append").format("console").start()
time.sleep(10)
query.stop()

We should see the following output in pyspark, while we produce data in the topic hello using Conduktor:

+---------------+
|          value|
+---------------+
|      my       |
|      first    |
|      message  |
...
+---------------+
only showing top 20 

The connection works! Now, let's explore transformations (mappings, filters, etc.) using JSON (structured data, easier to work with) using pyspark and Kafka.

Tips: Spark UI to see active queries

As working in a shell might be painful (we start streaming queries reading from Kafka and forget to .stop() them) to know what is running and what is not, we can use the Spark UI anytime to see the active queries and their status:

Example http://localhost:4040/StreamingQuery/:

Transforming JSON from Kafka with SQL

We're going the Netflix way and will work with JSON representing "view events", like:

{
  "user_id": 18,
  "content": "Stranger Things",
  "watched_at": "2023-08-10 10:00:01",
  "rating": 5
}

We will produce JSON data (the view events) into Kafka using Conduktor, and read it using pyspark. At the same time, we'll transform the incoming data SQL as it's the ubiquitous language to work with structured data.

Produce the JSON above in our topic `hello' then execute the following code:

from pyspark.sql.functions import *
from pyspark.sql.types import *

json_schema = StructType([
  StructField("user_id", StringType()),
  StructField("content", StringType()),
  StructField("watched_at", TimestampType()),
  StructField("rating", IntegerType()),
])

# df is our DataFrame reading from Kafka above
json_df = df.select(from_json(col("value").cast("string"), json_schema).alias("value"))
json_df.printSchema()
# root
#  |-- value: struct (nullable = true)
#  |    |-- user_id: string (nullable = true)
#  |    |-- content: string (nullable = true)
#  |    |-- watched_at: timestamp (nullable = true)
#  |    |-- rating: integer (nullable = true)

query = json_df.writeStream.outputMode("append").format("console").start()
time.sleep(10)
query.stop()

You should see the data from Kafka properly parsed into a struct:

+--------------------+
|               value|
+--------------------+
|{18, Stranger Thi...

Let's do some SQL on it now. We need to give a name to the temporary SQL view Spark is going to create from our DataFrame and use (we cannot use the name of the variable otherwise we'll run into a TABLE_OR_VIEW_NOT_FOUND error because it is not known from in the SQL context).

We execute this to create a temporary SQL view named netflix_view and display the average ratings by content:

# we select value.* to avoid prefixing field names by "value", eg: value.name 
json_df.select("value.*").createOrReplaceTempView("netflix_view")

averageRatings = spark.sql("SELECT content, AVG(rating) FROM netflix_view GROUP BY content")
query = averageRatings.writeStream.outputMode("complete").format("console").start()
# we must switch to "complete" output mode because we GROUP BY otherwise Spark will fail with:
# Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermar

time.sleep(10)
query.stop()

Each time we're going to produce data into our topic, we'll see the average ratings by content updated in real-time:

Let's say we produce this into our Kafka topic:

{
  "user_id": 112,
  "content": "The Crown",
  "watched_at": "2023-08-11 10:00:01",
  "rating": 4
}

Spark will display the update:

+---------------+-----------+
|        content|avg(rating)|
+---------------+-----------+
|Stranger Things|       3.75|
|      The Crown|        4.0

Thanks to pyspark and Kafka, we have the power to run any SQL query on our data, and see the results updated in real-time in our terminal. Glorious!

Let's take a detour and explore alternatives using Conduktor to do SQL without Spark or without any 3rd party stream processing platform actually.

Faster: Use Console Kafka UI to do filtering

If we want to filter only fields (no GROUP BY), there is no need for pyspark, as Console is a powerful Kafka UI that provides various filtering mechanisms to filter data from Kafka.

Example below: we filter only the messages where rating > 4. We can add as many filters as we want, pretty cool and faster than writing a program.

Smarter: create VIEW over a topic (advanced)

If you're familiar with Conduktor Gateway, we can create a Virtual SQL Topic, a persistent SQL view over a topic. This is an alternative to Spark and Flink. There is no code to write, no stream processing framework to manage. It's totally seamless and costs absolutely $0 as everything is in memory and processed at runtime!

If you have deployed Conduktor Gateway, you can connect to it on Console and add the SQL Topic interceptor with your SQL query:

{
  "name": "my-virtual-sql-topic-interceptor",
  "pluginClass": "io.conduktor.gateway.interceptor.VirtualSqlTopicPlugin",
  "priority": 100,
  "config": {
    "virtualTopic": "good_ratings",
    "statement": "SELECT * FROM my_netflix_topic WHERE rating >= 4"
  }
}

Using this, all users now have access to a topic named good_ratings (built from your SQL query). It's totally seamless that it's a virtual topic for all your users: it's not materialized (no storage, no partitions), but it acts like a normal topic. You can use it with kafka-console-consumer, Spring, etc.

A classic use-case we see it to hide the original topic my_netflix_topic to all users, and only expose a subset, like good_ratings here.

It's a great way to hide sensitive data, or to expose only the data you want to expose. You can also change the SQL query on-the-fly, without any consumer to be aware of it (if you want to refine your WHERE condition for instance).

Generating JSON with AI (advanced)

Ready to go mad and explore more options? Let's use OpenAI (ChatGPT) and Google to do some crazy stuff! This is an optional part as it is more advanced. Feel free to skip it if you're not interested.

We need an OpenAI account and a Google Cloud account. Register to both if not done yet.

When it's done, we need to:

Then export the following environment variables to make them available to pyspark (you have to quit pyspark (Ctrl+C), add these exports to your shell, and restart pyspark):

# replace with your own values from OpenAI and Google. These values are fake.
export OPENAI_API_KEY=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
export GOOGLE_API_KEY=AIzaSyCfF5XZRSxkmUSjn5SE5tVBZLrzdyw2pFQ
export GOOGLE_CSE_ID=5742144820bd0493e

We'll use pyspark-ai to work with OpenAI from pyspark.

  • If you have a paying OpenAPI subscription with access to ChatGPT-4, use this (default):

from pyspark_ai import SparkAI
spark_ai = SparkAI()
spark_ai.activate()

If you don't have access to GPT4 ("The model gpt-4 does not exist or you do not have access to it"), use ChatGPT3.5 instead. For this, we use langchain to provide a specific model to SparkAI:

from pyspark_ai import SparkAI
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model_name='gpt-3.5-turbo')
spark_ai = SparkAI(llm=llm, verbose=True)
spark_ai.activate()

The list of the available models are here: https://platform.openai.com/account/rate-limits, there are many models!

After the activation, we can use spark_ai to use the AI power. Let's start by creating a DataFrame from Wikipedia about the most subscribed Youtube channels:

df = spark_ai.create_df("https://en.wikipedia.org/wiki/List_of_most-subscribed_YouTube_channels", [ "name", "subscribers", "language", "category", "country" ])
df.show()

This will output the data properly extracted from Wikipedia:

[..]
+--------------------+-----------+--------+-------------+--------------------+
|                name|subscribers|language|     category|             country|
+--------------------+-----------+--------+-------------+--------------------+
|            T-Series|      249.0|   Hindi|        Music|               India|
|             MrBeast|      183.0| English|Entertainment|       United States|
|           Cocomelon|      165.0| English|    Education|       United States|
|Sony Entertainmen...|      162.0|   Hindi|Entertainment|               India|
|     Kids Diana Show|      113.0| English|Entertainment|Ukraine- United S...|
|           PewDiePie|      111.0| English|Entertainment|              Sweden|
|         Like Nastya|      107.0| English|Entertainment|Russia- United St...|
|       Vlad and Niki|      101.0| English|Entertainment|Russia- United St...|
|   Zee Music Company|       99.5|   Hindi|        Music|               India|
|                 WWE|       97.1| English|       Sports|       United States|
|           Blackpink|       91.2|  Korean|        Music|         South Korea|
|           Goldmines|       89.5|   Hindi|         Film|               India|
|            Sony SAB|       85.2|   Hindi|Entertainment|               India|
|     5-Minute Crafts|       80.2| English|       How-to|              Cyprus|
|           BangtanTV|       76.4|  Korean|        Music|         South Korea|
|         Hybe Labels|       72.6|  Korean|        Music|         South Korea|
|              Zee TV|       72.4|   Hindi|Entertainment|               India|
|       Justin Bieber|       71.9| English|        Music|              Canada|
|            Pinkfong|       69.5| English|    Education|         South Korea|
|ChuChu TV Nursery...|       67.5|   Hindi|    Education|               India|
+--------------------+-----------+--------+-------------+--------------------+
[..]

We are in 2023, right? Thanks to the advent of AI, we now have a user-friendly, natural language query system to manipulate data:

>>> df.ai.verify("expect France not to be in the countries")
Result: True
# it hurts

>>>  df.ai.transform("per country").show()
+--------------------+-----------------+
|             country|total_subscribers|
+--------------------+-----------------+
|               India|           1312.2|
|       United States|            678.5|
|Ukraine- United S...|            113.0|
|              Sweden|            111.0|
|Russia- United St...|            208.0|
|         South Korea|            309.7|
|              Cyprus|             80.2|
|              Canada|             71.9|
|              Brazil|             66.6|
|           Argentina|             59.7

Behind the scene, it's building a pyspark program (typically with SQL) to answer our queries, this is impressive.

If you're not using GPT-4, you might run into this error, because the dataset of the query is too big to be handled by GPT-3.5:

openai.error.InvalidRequestError: This model's maximum context length is 4097 tokens. However, your messages resulted in 7337 tokens. Please reduce the length of the messages.

Another amazing thing is to ask a question to be plotted, and it will understand what we expect, build a pyspark program to make it happen, and open our browser for us:

>>> df.ai.plot("by country")

Using AI to generate JSON into Kafka

Now that we understand how AI could help us, let's use it to generate random JSON based on our model.

We use directly ChatOpenAI to ask it to generate similar payloads from our model:

from langchain.chat_models import ChatOpenAI
from langchain.schema import *
llm = ChatOpenAI(model_name='gpt-4') # or gpt-3.5 if you don't have a paying OpenAI subscription
random_json = llm([HumanMessage(content="""generate random minified JSON payloads based on this: {
  "user_id": 112,
  "content": "The Crown",
  "watched_at": "2023-08-11 10:00:01",
  "rating": 4
}""")]).content

random_json

It takes a few seconds until ChatGPT understands and generates spot-on content:

{"user_id":345,"content":"Breaking Bad","watched_at":"2022-12-14 18:30:12","rating":5}
{"user_id":876,"content":"Friends","watched_at":"2023-01-03 21:00:00","rating":4}
{"user_id":290,"content":"Stranger Things","watched_at":"2023-04-16 22:45:01","rating":5}
{"user_id":789,"content":"The Witcher","watched_at":"2023-08-14 20:55:00","rating":4}
{"user_id":654,"content":"Mandalorian","watched_at":"2023-07-20 19:10:10","rating":5}
{"user_id":321,"content":"Peaky Blinders","watched_at":"2023-09-10 22:00:45","rating":4}
{"user_id":154,"content":"Game of Thrones","watched_at":"2023-05-15 21:30:30","rating":3}
{"user_id":903,"content":"Money Heist","watched_at":"2023-10-01 23:00:01","rating":4}
{"user_id":567,"content":"Westworld","watched_at":"2023-06-12 20:10:10","rating":4}
{"user_id":238,"content":"Better Call Saul","watched_at":"2023-07-11 19:00:00","rating":5}

We will use this to produce data into Kafka:

chatgpt_records = random_json.strip().split("\n")
chatgpt_df = spark.createDataFrame([Row(value=x) for x in chatgpt_records])
chatgpt_df.selectExpr("CAST(value AS STRING)").write.format("kafka").options(**kafka_options).option("topic", "hello").save()

Each time we execute a new ChatGPT generation that we send into Kafka, we should see updated results in our pyspark terminal as our query averageRatings is still running (otherwise, rerun it):

# Remember: averageRatings = spark.sql("SELECT content, AVG(rating) FROM netflix_view GROUP BY content")

+---------------+-----------+
|        content|avg(rating)|
+---------------+-----------+
|      Westworld|        4.0|
|    Money Heist|        4.0|
|Stranger Things|        4.0|
|      The Crown|        4.0|
|         Narcos|        5.0|
|     The Office|        4.0|
| Peaky Blinders|        5.0|
|   Breaking Bad|        5.0|
|The Mandalorian|        5.0|
|        Friends|        3.0|
|Game of Thrones|        4.0

The loop is closed! We produce random JSON data using AI into Kafka, and read it back in pyspark to do some SQL queries on it.

Conclusion

This is the end of this article as we have already covered a lot of things!

  • How to get started with PySpark and Kafka

  • How to do filtering and transformations using SQL

  • How to do the same simply with Conduktor

  • How to use OpenAI/ChatGPT to open up the potential of what we can do just by asking questions

pyspark is really useful when we need to run ad-hoc analysis locally, and explore our data using SQL. Combined to Conduktor with its powerful Kafka UI and Gateway to do SQL on Kafka, it's a powerful stack to explore real-time data and do some data exploration and data science for cheap.

Don't miss these