How to optimize spark dataframes using repartition?
Lets first look into what happens when we repartition the dataframe!
When you repartition a DataFrame, you effectively tell Spark how you want your data distributed over several cluster nodes. This is accomplished by defining the number of divisions that Spark should build.
When the DataFrame is written to disc, the number of partitions you choose determines how many files are produced. For example, if you have a DataFrame with 100 data and set 10 partitions, Spark will generate 10 files, each with 10 records. So now lets look a little bit into the code:
Below is a sample DataFrame.
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data = [(“James”,”Don”,”Smith”,”36636",”M”,3000),
(“Michael”,”Rose”,”Rise”,”40288",”M”,4000),
(“Robert”,”White”,”Williams”,”42114",”M”,4000),
(“Maria”,”Anne”,”Jones”,”39192",”F”,4000),
(“Jen”,”Mary”,”Brown”,”34479",”F”,-1)]schema = StructType([ \
StructField(“firstname”,StringType(),True), \
StructField(“middlename”,StringType(),True), \
StructField(“lastname”,StringType(),True), \
StructField(“id”, StringType(), True), \
StructField(“gender”, StringType(), True), \
StructField(“salary”, IntegerType(), True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
df.display()
Spark distributes the data into certain partitions by default, Now lets look into the default partitions that are created, by using .rdd.getNumPartitions() method.
df.rdd.getNumPartitions()
Now lets increase the partitions of the dataframe to 10 using the repartitions method:
df=df.repartition(10)
df.rdd.getNumPartitions()
*Note: repartition method is not recommended to reduce the partitions of a dataframe, due to data shuffling across many partitions.
As the data is distributed uniformly across nodes in a cluster, which enhance the performance while reading and writing data because the data is local to the node where it is stored.
How should I decide the optimal number of partitions of my data?
Basically there are 2 cases which might happen if you are not aware of partitioning, which are as below:
-Having too few partitions may not leverage parallelism and there can be worker nodes sitting ideal and,
-If too many partitions then the task scheduling may consume more time in comparison to the actual execution time.
So the best way is to enable AQE i.e Adaptive Query Execution. Will look into it in detail in my further posts, but as of now lets have a short introduction of it.
So what is AQE?
Adaptive Query Execution (AQE) is a fantastic feature of Spark 3.0 that reoptimizes and modifies query plans based on runtime statistics gathered during query execution.
Below are the features of AQE:
- Reducing Post-shuffle Partitions.
- Switching Join Strategies to broadcast Join.
- Optimizing Skew Join.
How does it works/How to utilize it?
To utilize this feature in our spark code we just have to enable it i.e set it’s conf to True, and below is how we can do it:
spark.conf.set(“spark.sql.adaptive.enabled”,true)
Just by adding the above code, you can leverage the power of AQE. And if you want to reduce the post shuffle partitions use:
spark.conf.set(“spark.sql.adaptive.coalescePartitions.enabled”,true)
And to optimize any skew join add:
spark.conf.set(“spark.sql.adaptive.skewJoin.enabled”,true)
This feature of spark if enabled, takes care of the optimal partition size of your data, so you don’t have to calculate manually the size of data, number of cores or any other stuff.
Conclusion
Though repartition is an expensive operations, but if implemented in a right manner can be advantageous to us in terms of parallelism and optimal utilization of our resources.