bench: [DO NOT MERGE] single spill file for multiple partitions #3904
Draft
Shekharrajak wants to merge 25 commits intoapache:mainfrom
Draft
bench: [DO NOT MERGE] single spill file for multiple partitions #3904Shekharrajak wants to merge 25 commits intoapache:mainfrom
Shekharrajak wants to merge 25 commits intoapache:mainfrom
Conversation
Add a `shuffle_bench` binary that benchmarks shuffle write and read performance independently from Spark, making it easy to profile with tools like `cargo flamegraph`, `perf`, or `instruments`. Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating synthetic data with configurable schema. Covers different scenarios including compression codecs, partition counts, partitioning schemes, and memory-constrained spilling.
…arquet - Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit the number of batches buffered before spilling, allowing earlier spilling to reduce peak memory usage on executors - Fix too-many-open-files: close spill file FD after each spill and reopen in append mode, rather than holding one FD open per partition - Refactor shuffle_bench to stream directly from Parquet instead of loading all input data into memory; remove synthetic data generation - Add --max-buffered-batches CLI arg to shuffle_bench - Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing. This branch now only adds the shuffle benchmark binary.
Spawns N parallel shuffle tasks to simulate executor parallelism. Each task reads the same input and writes to its own output files. Extracts core shuffle logic into shared async helper to avoid code duplication between single and concurrent paths.
Revert new shuffle metrics (interleave_time, coalesce_time, memcopy_time) to keep PR focused on the benchmark tool. Remove read-back functionality from shuffle_bench to focus on write performance. Remove undocumented --max-buffered-batches option from README.
Contributor
Author
|
Actual output from main branch + #3752 |
Contributor
Author
Final Benchmark Results (Corrected output size and byte writes)Configuration: 5M rows, 10 columns, 200 partitions, zstd(1), 50 MB memory limit, 10 spill events, 3 iterations + 1 warmup
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Ref #3903 #3752
Rationale for this change
Bechmarking
What changes are included in this PR?
Analysis
19% overall improvement driven by a 67% reduction in the I/O write time component (0.113s down to 0.037s). The reduction comes from creating 10 temp files instead of up to 200 during spill events.
Encode and repartition times are unchanged
Output size difference (226 MiB vs 4.38 MiB): The baseline writes uncompressed partition data to individual spill files, then copies raw bytes into the final output. This PR writes compressed data during the spill phase, so the final output file contains already-compressed data. This is an additional benefit of consolidating writes -- compression is applied once during spill rather than being bypassed during raw copy.
Spill count and in-memory data size are identical, confirming the memory pressure behavior is unchanged.
Scaling characteristics: The improvement scales with the ratio N/S (partitions to spill events). With 200 partitions and 10 spills, temp file count drops from 200 to 10. In production Spark jobs with spark.sql.shuffle.partitions=2000 and frequent spills, the reduction would be more significant.