Skip to content

bench: [DO NOT MERGE] single spill file for multiple partitions #3904

Draft
Shekharrajak wants to merge 25 commits intoapache:mainfrom
Shekharrajak:feature/single-spill-file-shuffle-3859-bench
Draft

bench: [DO NOT MERGE] single spill file for multiple partitions #3904
Shekharrajak wants to merge 25 commits intoapache:mainfrom
Shekharrajak:feature/single-spill-file-shuffle-3859-bench

Conversation

@Shekharrajak
Copy link
Copy Markdown
Contributor

@Shekharrajak Shekharrajak commented Apr 6, 2026

Ref #3903 #3752

Rationale for this change

Bechmarking

What changes are included in this PR?

  1. Generate test data (5M rows, ~226 MiB Parquet)
  2. Run benchmark (baseline on main branch)
cd native && cargo run --release --features shuffle-bench --bin shuffle_bench -- \
  --input /tmp/bench_data/ \
  --partitions 200 \
  --codec zstd --zstd-level 1 \
  --hash-columns 0,3 \
  --memory-limit 50000000 \
  --iterations 3 \
  --warmup 1 \
  --output-dir /tmp/comet_shuffle_bench_baseline
  1. this branch
cd native && cargo run --release --features shuffle-bench --bin shuffle_bench -- \
  --input /tmp/bench_data/ \
  --partitions 200 \
  --codec zstd --zstd-level 1 \
  --hash-columns 0,3 \
  --memory-limit 50000000 \
  --iterations 3 \
  --warmup 1 \
  --output-dir /tmp/comet_shuffle_bench_new
Metric main (baseline) This PR Delta
avg write time 1.376s 1.113s -19.1%
min write time 1.349s 1.106s -18.0%
max write time 1.415s 1.126s -20.4%
throughput 3,633,169 rows/s 4,490,434 rows/s +23.6%
write time (I/O) 0.113s (8.2%) 0.037s (3.3%) -67.3%
encode time 0.745s (54.1%) 0.726s (65.2%) -2.5% (noise)
repart time 0.041s (3.0%) 0.041s (3.7%) 0%
output size 226.20 MiB 4.38 MiB see analysis
spill count 10 10 identical
data size (in-memory) 440.36 MiB 440.36 MiB identical

Analysis
19% overall improvement driven by a 67% reduction in the I/O write time component (0.113s down to 0.037s). The reduction comes from creating 10 temp files instead of up to 200 during spill events.

Encode and repartition times are unchanged

Output size difference (226 MiB vs 4.38 MiB): The baseline writes uncompressed partition data to individual spill files, then copies raw bytes into the final output. This PR writes compressed data during the spill phase, so the final output file contains already-compressed data. This is an additional benefit of consolidating writes -- compression is applied once during spill rather than being bypassed during raw copy.

Spill count and in-memory data size are identical, confirming the memory pressure behavior is unchanged.

Scaling characteristics: The improvement scales with the ratio N/S (partitions to spill events). With 200 partitions and 10 spills, temp file count drops from 200 to 10. In production Spark jobs with spark.sql.shuffle.partitions=2000 and frequent spills, the reduction would be more significant.

andygrove and others added 24 commits March 21, 2026 07:43
Add a `shuffle_bench` binary that benchmarks shuffle write and read
performance independently from Spark, making it easy to profile with
tools like `cargo flamegraph`, `perf`, or `instruments`.

Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating
synthetic data with configurable schema. Covers different scenarios
including compression codecs, partition counts, partitioning schemes,
and memory-constrained spilling.
…arquet

- Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit
  the number of batches buffered before spilling, allowing earlier
  spilling to reduce peak memory usage on executors
- Fix too-many-open-files: close spill file FD after each spill and
  reopen in append mode, rather than holding one FD open per partition
- Refactor shuffle_bench to stream directly from Parquet instead of
  loading all input data into memory; remove synthetic data generation
- Add --max-buffered-batches CLI arg to shuffle_bench
- Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out
COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing.
This branch now only adds the shuffle benchmark binary.
Spawns N parallel shuffle tasks to simulate executor parallelism.
Each task reads the same input and writes to its own output files.
Extracts core shuffle logic into shared async helper to avoid
code duplication between single and concurrent paths.
Revert new shuffle metrics (interleave_time, coalesce_time,
memcopy_time) to keep PR focused on the benchmark tool. Remove
read-back functionality from shuffle_bench to focus on write
performance. Remove undocumented --max-buffered-batches option
from README.
@Shekharrajak Shekharrajak changed the title bench: single spill file for multiple partitions bench: [DO NOT MERGE] single spill file for multiple partitions Apr 6, 2026
@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Actual output from main branch + #3752

    Finished `release` profile [optimized + debuginfo] target(s) in 0.74s
     Running `target/release/shuffle_bench --input /tmp/bench_data/ --partitions 200 --codec zstd --zstd-level 1 --hash-columns 0,3 --memory-limit 50000000 --iterations 3 --warmup 1 --output-dir /tmp/comet_shuffle_bench_baseline`
=== Shuffle Benchmark ===
Input:          /tmp/bench_data/
Schema:         10 columns (2xstring, 4xfloat, 4xint)
Total rows:     5,000,000
Batch size:     8,192
Partitioning:   hash
Partitions:     200
Codec:          Zstd(1)
Hash columns:   [0, 3]
Memory limit:   47.68 MiB
Iterations:     3 (warmup: 1)

  [warmup 1/1] write: 1.442s  output: 225.81 MiB
  [iter 1/3] write: 1.365s  output: 226.29 MiB
  [iter 2/3] write: 1.415s  output: 226.14 MiB
  [iter 3/3] write: 1.349s  output: 226.16 MiB

=== Results ===
Write:
  avg time:         1.376s
  min/max:          1.349s / 1.415s
  throughput:       3,633,169 rows/s (total across 1 tasks)
  output size:      226.20 MiB

Input Metrics (last iteration):
  time_elapsed_processing: 0.110s
  time_elapsed_opening: 0.003s
  metadata_load_time: 0.001s
  time_elapsed_scanning_total: 5.233s
  statistics_eval_time: 0.000s
  output_rows: 10,000,000
  row_pushdown_eval_time: 0.000s
  bytes_scanned: 226.52 MiB
  elapsed_compute: 0.000s
  page_index_eval_time: 0.000s
  output_bytes: 879.54 MiB
  output_batches: 1,224
  bloom_filter_eval_time: 0.000s
  time_elapsed_scanning_until_data: 0.048s

Shuffle Metrics (last iteration):
  input batches:    612
  repart time:      0.041s (3.0%)
  encode time:      0.745s (54.1%)
  write time:       0.113s (8.2%)
  spill count:      10
  data size:        440.36 MiB

code changes #3903 + #3752

    Finished `release` profile [optimized + debuginfo] target(s) in 0.64s
     Running `target/release/shuffle_bench --input /tmp/bench_data/ --partitions 200 --codec zstd --zstd-level 1 --hash-columns 0,3 --memory-limit 50000000 --iterations 3 --warmup 1 --output-dir /tmp/comet_shuffle_bench_new`
=== Shuffle Benchmark ===
Input:          /tmp/bench_data/
Schema:         10 columns (2xstring, 4xfloat, 4xint)
Total rows:     5,000,000
Batch size:     8,192
Partitioning:   hash
Partitions:     200
Codec:          Zstd(1)
Hash columns:   [0, 3]
Memory limit:   47.68 MiB
Iterations:     3 (warmup: 1)

  [warmup 1/1] write: 1.296s  output: 4.35 MiB
  [iter 1/3] write: 1.126s  output: 4.36 MiB
  [iter 2/3] write: 1.106s  output: 4.39 MiB
  [iter 3/3] write: 1.108s  output: 4.40 MiB

=== Results ===
Write:
  avg time:         1.113s
  min/max:          1.106s / 1.126s
  throughput:       4,490,434 rows/s (total across 1 tasks)
  output size:      4.38 MiB

Input Metrics (last iteration):
  bytes_scanned: 226.52 MiB
  time_elapsed_opening: 0.001s
  time_elapsed_scanning_until_data: 0.055s
  metadata_load_time: 0.000s
  output_rows: 10,000,000
  statistics_eval_time: 0.000s
  bloom_filter_eval_time: 0.000s
  output_batches: 1,224
  elapsed_compute: 0.000s
  output_bytes: 879.54 MiB
  row_pushdown_eval_time: 0.000s
  time_elapsed_scanning_total: 4.645s
  page_index_eval_time: 0.000s
  time_elapsed_processing: 0.111s

Shuffle Metrics (last iteration):
  input batches:    612
  repart time:      0.041s (3.7%)
  encode time:      0.726s (65.2%)
  write time:       0.037s (3.3%)
  spill count:      10
  data size:        440.36 MiB

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Final Benchmark Results (Corrected output size and byte writes)

Configuration: 5M rows, 10 columns, 200 partitions, zstd(1), 50 MB memory limit, 10 spill events, 3 iterations + 1 warmup

Metric main (baseline) This PR (optimized) Delta
avg write time 1.354s 1.193s -11.9%
min write time 1.323s 1.186s -10.4%
max write time 1.387s 1.196s -13.8%
throughput 3,693,244 rows/s 4,192,690 rows/s +13.5%
write time (I/O) 0.111s (8.2%) 0.114s (9.6%) +2.7% (noise)
encode time 0.747s (55.2%) 0.730s (61.3%) -2.3% (noise)
repart time 0.040s (3.0%) 0.042s (3.5%) +5% (noise)
output size 226.16 MiB 226.21 MiB identical
spill count 10 10 identical
spilled bytes -- 221.74 MiB --
data size 440.36 MiB 440.36 MiB identical

@kazuyukitanimura kazuyukitanimura marked this pull request as draft April 8, 2026 20:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants