Earn recognition and rewards for your Microsoft Fabric Community contributions and become the hero our community deserves.
Learn moreSee when key Fabric features will launch and what’s already live, all in one place and always up to date. Explore the new Fabric roadmap
Hi,
we face some performance issues when running a Spark Job Definition with a structured streaming job getting data from a Kafka. We just want to dump the Kafka data into a Lakehouse for further processing in later steps.
Our issue is, that we have an average latency of around 30 seconds between the event being visible to Kafka and the time the event is picked up by the Spark job. I seems that the overall micro-batch processing time is the main issue and I don't know why it takes so long. I have already tried with different cluster sizes but due to the fact that we're just passing through the data the cluster size doesn't really matter. At least, the number of vCores should match the number of Kafka partitions, which in our case is 6 partitions.
I took the same source code and have run it in a Databricks notebook where I get an average latency of 0.6 seconds, so it's not an issue on the source side.
Here is our source code:
import sys
import os
from notebookutils import mssparkutils
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import pyspark.sql.functions as f
import pyspark.sql.types as t
if __name__ == "__main__":
#Spark session builder
#Disable delta.optimizeWrite to ensure data is written as fast as possible
spark = (SparkSession
.builder
.appName("test")
.config("spark.microsoft.delta.optimizeWrite.enabled", "false")
.config("spark.sql.parquet.vorder.enabled", "false")
.config("spark.ms.autotune.enabled", "true")
.getOrCreate())
spark_context = spark.sparkContext
spark_context.setLogLevel("DEBUG")
print("spark.synapse.pool.name : " + spark.conf.get("spark.synapse.pool.name"))
print()
print("spark.driver.cores : " + spark.conf.get("spark.driver.cores"))
print("spark.driver.memory : " + spark.conf.get("spark.driver.memory"))
print("spark.executor.cores : " + spark.conf.get("spark.executor.cores"))
print("spark.executor.memory : " + spark.conf.get("spark.executor.memory"))
print("spark.executor.instances: " + spark.conf.get("spark.executor.instances"))
print()
print("spark.dynamicAllocation.enabled : " + spark.conf.get("spark.dynamicAllocation.enabled"))
print("spark.dynamicAllocation.maxExecutors : " + spark.conf.get("spark.dynamicAllocation.maxExecutors"))
print("spark.dynamicAllocation.minExecutors : " + spark.conf.get("spark.dynamicAllocation.minExecutors"))
checkpoint_path = "Files/checkpoint/path/to/checkpoint/location"
# Works with API-Key und Secret
df = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<my kafka server>:9092")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format("<my kafka api key>", "<my kafka api secret>"))
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", "<my kafka topic>")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
)
# Add metadata to the dataframe, e.g. current timestamp and convert key and value to string
df = df.withColumns({"key": f.col("key").cast("string"), "value": f.col("value").cast("string"), "timestamp_datalake": f.current_timestamp()})
df_stream = (
df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.outputMode("append")
.toTable("my_test_table")
)
df_stream.awaitTermination()
This is a sample of the Spark UI:
BTW: Importing the same data with an Event Streams from Kafka to a KQL database takes even longer ~40 seconds avg latency.
Any help is much appreciated.
Michael
Hi @Anonymous,
thanks for your answer.
I have implemented your suggestions and here are the results:
The topic produces messages at a rate of ~10 messages per second with each meassage ~3kB in size. Compared to what is mentioned in the linked blog entry this is a drop in the ocean.
Regards,
Michael
Hi @MiSchroe ,
Given that the same code runs efficiently on Databricks, the problem may be in the configuration or environment of the current setup.
I have the following suggestions for performance tuning:
It is recommended to increase spark.executor.heartbeatInterval from 10 seconds to 20 seconds.
Increase driver and executor memory.
Increase parallelism: increase the number of partitions in Kafka.
Correctly resize the executor.
More details on performance tuning can be found in this article:
Performance Tuning of an Apache Kafka/Spark Streaming System | HPE Developer Portal
Best Regards,
Yang
Community Support Team
If there is any post helps, then please consider Accept it as the solution to help the other members find it more quickly.
If I misunderstand your needs or you still have problems on it, please feel free to let us know. Thanks a lot!
User | Count |
---|---|
12 | |
5 | |
3 | |
3 | |
3 |
User | Count |
---|---|
8 | |
7 | |
6 | |
6 | |
4 |