Lessons Learned: SortMergeJoin vs ShuffleHashJoin in Modern Data Processing
Introduction
In big data processing, join operations are often data pipelines' most resource-intensive and time-critical components. Understanding the underlying join strategies becomes crucial for optimization when dealing with large-scale data joins. This drives me to explore how modern tools and plugins can enhance query performance and efficiency in distributed data processing systems.
This exploration begins with examining how modern data processing engines like Comet (based on Apache DataFusion) and Databricks' Photon revolutionize join operations by replacing traditional SortMergeJoin with ShuffleHashJoin. Through systematic testing and analysis, I'll uncover the mechanics behind these join strategies and reveal why this optimization path represents a significant leap forward in query performance.
Understanding Join Strategies
SQL Join Basics
SQL joins are fundamental operations that data engineers use daily to combine data from multiple tables into meaningful business insights. Consider a common scenario: your transaction data contains order IDs and customer IDs, but business users need richer information - they want to know customer names, and their cities, and analyze purchasing patterns across different regions. This is where JOIN operations become essential, connecting data points stored across different tables to create a complete view of the business.
For example, when analyzing sales performance, we often need to combine transaction records with store and location data. In our test case, we're joining transaction data with stores and countries' information to answer questions like "What's the total revenue per country?" or "Which stores are performing best in each region?". This is implemented through SQL joins:
SELECT
transactions.id,
amount,
countries.name as country_name,
employees,
stores.name as store_name
FROM transactions
LEFT JOIN stores ON transactions.store_id = stores.id
LEFT JOIN countries ON transactions.country_id = countries.id
When executing join queries, both traditional databases like MySQL and distributed systems like Spark follow a similar concept of query planning and optimization. In MySQL, the query optimizer generates an execution plan that determines how to efficiently access and combine data from different tables, considering factors like available indexes and table statistics.
In Spark, when we run our join query between transactions, stores, and countries tables, the Spark SQL engine also generates a query execution plan. You can see this plan in the Spark UI SQL tab, which shows how Spark will distribute the work across the cluster, manage data shuffling, and choose the appropriate join strategy. The key difference is that Spark needs to consider distributed data processing aspects - how to partition data across nodes, minimize data movement, and handle large-scale datasets efficiently.
Source: https://dataninjago.com/2021/12/16/spark-deep-dive-5-catalyst-queryexecution/
Shuffle in Distributed Processing
In distributed systems like Spark, data is naturally partitioned across multiple nodes in a cluster. Taking our earlier join query, imagine the transactions table is spread across 10 different nodes, while stores and countries tables are distributed across several nodes as well. When we need to join these tables, we face a fundamental challenge: records that need to be joined together might live on different nodes.
This is where shuffle comes in. Shuffling is the process of redistributing data across the cluster to ensure that all records with the same join key end up on the same executor node. For example, when joining transactions with countries on country_id:
FROM transactions
LEFT JOIN countries ON transactions.country_id = countries.id
Spark needs to ensure all transactions and country records with the same country_id are co-located. This requires:
Determining target partitions based on join keys
Network transfer of data between nodes
Writing and reading shuffle files
Understanding shuffle operations is crucial because they involve expensive disk I/O and network transfer, often becoming the main bottleneck in distributed join performance.
Spark's Join Strategies for Shuffled Data
While Spark supports multiple join strategies (Broadcast Hash Join, Shuffle Hash Join, Sort Merge Join, Cartesian Join, and Broadcast Nested Loop Join) - each optimized for different scenarios - this exploration focuses on two main strategies for large-scale distributed joins: SortMergeJoin and ShuffleHashJoin. For readers interested in other join strategies, there are excellent deep-dive articles available on Substack and Medium by data professionals.
Let's examine how these two key strategies handle our test case:
Tips: Starting from Spark 3.0, the Adaptive Query Execution (AQE) engine can dynamically switch from SortMergeJoin to Broadcast Hash Join during runtime if it determines that one of the join tables is small enough to fit into executor memory. This optimization can significantly improve performance by reducing or eliminating shuffle operations, as data partitions no longer need to be exchanged between nodes for computation and mapping.
Now that we understand the fundamentals of join strategies in distributed processing, let's dive into our test implementation to see how these different approaches perform in practice. Through systematic testing, we'll examine how Comet and Photon optimize join operations by favoring ShuffleHashJoin over traditional SortMergeJoin.
Test Setup and Implementation
To demonstrate join strategies at scale, I've simulated a realistic dataset following our business data volumes scenario:
Transactions: 150 million records (large fact table)
Stores: 99 records (dimension table)
Countries: 12 records (dimension table)
This data composition reflects a typical business scenario where we have a large number of transactions across a fixed set of stores and countries.
Setting up Spark Environment
I created a simple Docker setup based on apache/spark:3.5.0 image to run our tests. The environment consists of:
FROM apache/spark:3.5.0
COPY extra-jars/* $SPARK_HOME/jars/
The docker-compose configuration spins up three essential services:
Spark Master: Manages the cluster and resource allocation
Spark Worker: Executes the actual computations
Spark History Server: Provides detailed query analysis and visualization
The History Server is particularly useful for examining the query plans and understanding how Spark executes our join operations.
Starting with one Spark Worker provides a clean baseline for comparing different join strategies to ensure fair comparison across all test scenarios - default Spark, Comet.
To run the Photon tests, you'll need:
Access to a Databricks workspace
Permission to create and run jobs
A workspace that supports Photon runtime and spark 3.5 ( DBR 15.4 LTS)
Why’s Databricks Job? You need resource-isolated
Dedicated compute resources just like our local setup
Can configure exactly 1 worker with 2 CPU cores to match our local environment
No interference from other users or workloads
For Comet tests:
Follow the installation guide at https://datafusion.apache.org/comet/user-guide/installation.html
Download the version compatible with: Spark 3.5.0 / Scala 2.12
Once downloaded, place these jars in the extra-jars directory of our Docker setup to enable Comet's optimized join operations later.
Data Preparation Script
For data preparation, I use prepare_dataset.py to generate our test tables. The script leverages Spark's rand() function to create realistic data distributions across:
150M transactions with randomized amounts
99 stores with varying employee counts
12 countries
The generated datasets are written to the local /apps folder and mounted into the Spark Docker containers, making them readily available for our tests.
version: '3'
services:
spark-master:
volumes:
- ./apps:/app
- ./extra-jars:/extra-jars
...
For the Databricks test, we need to write the prepared datasets to cloud storage that Databricks can access. The data can be uploaded to DBFS or other storage you have access like S3, Azure Blob, or GCS.
Test Implementation
Let's look at the test implementation in job.py where we execute our join query across different configurations:
def run_shuffle_test(plugin):
spark = SparkSession.builder.appName("ShuffleTest") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "/opt/spark/spark-events") \
.getOrCreate()
# Load tables into temp views
load_transactions(spark)
load_stores(spark)
load_countries(spark)
# Execute join query
joined_df = spark.sql("""
SELECT
transactions.id,
amount,
countries.name as country_name,
employees,
stores.name as store_name
FROM transactions
LEFT JOIN stores ON transactions.store_id = stores.id
LEFT JOIN countries ON transactions.country_id = countries.id
""")
joined_df.write.mode("overwrite").parquet(
"/app/" + plugin + "_transact_countries"
)
The job.py implements our core test logic with the join query, while the specific engine configurations (Spark default, Comet) are passed through spark-submit commands defined in the Makefile.
run:
@echo "Submit the spark job with plugin $(plugin)"
@if [ "$(plugin)" = "comet" ]; then \
docker exec -it spark-master spark-submit \
--master spark://spark-master:7077 \
--conf "spark.plugins=org.apache.spark.CometPlugin" \
--conf "spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" \
--conf "spark.comet.exec.shuffle.enabled=true" \
--conf "spark.comet.exec.replaceSortMergeJoin=true" \
/app/job.py $(test_type) $(plugin); \
else \
docker exec -it spark-master spark-submit \
--master spark://spark-master:7077 \
/app/job.py $(test_type) $(plugin); \
fi
Note: to properly test SortMergeJoin vs ShuffleHashJoin behavior, we need to disable broadcast joins:
--conf "spark.sql.autoBroadcastJoinThreshold=-1"
Yo, let's run our tests using the Makefile commands:
# Run with default Spark
make run test_type=join plugin=none
# Run with Comet plugin
make run test_type=join plugin=comet
These commands execute the same join query across different configurations while maintaining consistent test conditions. The Makefile approach makes it simple to switch between engines and reproduce our test scenarios.
Performance Analysis
Here's a performance comparison table across the three engines:
This data clearly shows significant performance improvements with Comet and Photon, particularly in reducing memory spills and join duration.
Key Findings
Looking at the performance metrics, several important insights emerge:
Why did ShuffleHashJoin eliminate spills?
The key difference lies in how data is processed:
SortMergeJoin (Default Spark):
Must sort both large datasets by joining key
Sorting requires significant memory
When memory pressure builds, data spills onto the disk
Our test showed a 10.8 GiB memory spill and a 1634.3 MiB disk spill
ShuffleHashJoin (Comet/Photon):
Builds hash table after shuffle
Only needs to hold the hash table in memory
Streams other tables through for probing
No sorting is required, eliminating major causes of spills
Zero spills in both Comet and Photon tests
Looking at our join query between transactions, stores, and countries tables, ShuffleHashJoin's approach is more memory-efficient because:
It builds hash tables from smaller tables (stores/countries)
Streams the large table (transactions) through
Avoids expensive sorting operations
This explains why we see such dramatic improvements in memory and disk spill metrics in our performance tests.
Why did ShuffleHashJoin prove more efficient than SortMergeJoin?
Our test case works perfectly for ShuffleHashJoin because:
Table Size Pattern
Large transactions table (150M records)
Small dimension tables (99 stores, 12 countries)
Perfect for building hash tables from smaller sides
Join Key Characteristics
Low cardinality (store_id 1-99, country_id 1-12)
Even distribution through modulo operation
Efficient hash table lookup
For different scenarios like:
Joining on high cardinality number ranges
Continuous values instead of discrete IDs
Both tables are large with many unique keys
SortMergeJoin would be more suitable as it handles these patterns better through its sorting and streaming approach.
This highlights why understanding data characteristics is crucial for choosing the right join strategy.
Conclusion
This exploration into join strategies reveals how modern data processing engines are evolving beyond traditional SortMergeJoin approaches. Through systematic testing with a 150M-record dataset, we've seen significant performance gains from both Comet and Photon's implementations of ShuffleHashJoin.
Key takeaways:
ShuffleHashJoin shows clear advantages for small-to-large table joins with low cardinality keys
Modern engines eliminate memory/disk spills through better join strategies
External shuffle managers contribute to overall performance improvements
Understanding data characteristics is crucial for optimal join performance
While ShuffleHashJoin isn't a universal solution - especially for large-to-large joins or high cardinality keys - it represents an important optimization path for specific use cases. This investigation marks just the beginning of exploring how modern data processing tools can enhance query performance through intelligent strategy choices.
To explore these concepts hands-on, you can find the complete test setup, data generation scripts, and detailed instructions in the GitHub repository: https://github.com/hoaihuongbk/spark-shuffle-experience