diff --git a/benchmarks/pyspark/benchmarks/__init__.py b/benchmarks/pyspark/benchmarks/__init__.py index 7d913a7d6d..6208be4b0d 100644 --- a/benchmarks/pyspark/benchmarks/__init__.py +++ b/benchmarks/pyspark/benchmarks/__init__.py @@ -26,12 +26,14 @@ from .base import Benchmark from .shuffle import ShuffleHashBenchmark, ShuffleRoundRobinBenchmark +from .shuffle_size import ShuffleSizeBenchmark # Registry of all available benchmarks _BENCHMARK_REGISTRY: Dict[str, Type[Benchmark]] = { ShuffleHashBenchmark.name(): ShuffleHashBenchmark, ShuffleRoundRobinBenchmark.name(): ShuffleRoundRobinBenchmark, + ShuffleSizeBenchmark.name(): ShuffleSizeBenchmark, } @@ -76,4 +78,5 @@ def list_benchmarks() -> List[tuple[str, str]]: 'list_benchmarks', 'ShuffleHashBenchmark', 'ShuffleRoundRobinBenchmark', + 'ShuffleSizeBenchmark', ] diff --git a/benchmarks/pyspark/benchmarks/shuffle_size.py b/benchmarks/pyspark/benchmarks/shuffle_size.py new file mode 100644 index 0000000000..9e5d5fbcbd --- /dev/null +++ b/benchmarks/pyspark/benchmarks/shuffle_size.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Shuffle size benchmark for measuring shuffle write bytes. + +Measures the actual shuffle file sizes on disk to compare +shuffle file sizes between Spark and Comet shuffle implementations. +This is useful for investigating shuffle format overhead (see issue #3882). + +The benchmark sets spark.local.dir to a dedicated temp directory and +measures the total size of shuffle data files (.data) written there. +""" + +import json +import os +import urllib.request +from typing import Dict, Any + +from pyspark.sql import DataFrame + +from .base import Benchmark + + +def get_shuffle_write_bytes(spark) -> int: + """Get total shuffle write bytes from the Spark REST API.""" + sc = spark.sparkContext + ui_url = sc.uiWebUrl + url = f"{ui_url}/api/v1/applications/{sc.applicationId}/stages" + with urllib.request.urlopen(url) as resp: + stages = json.loads(resp.read()) + return sum(s.get("shuffleWriteBytes", 0) for s in stages) + + +def get_shuffle_disk_bytes(local_dir: str) -> int: + """Walk spark.local.dir and sum the sizes of all shuffle .data files.""" + total = 0 + for root, _dirs, files in os.walk(local_dir): + for f in files: + if f.endswith(".data"): + total += os.path.getsize(os.path.join(root, f)) + return total + + +def format_bytes(b: int) -> str: + """Format byte count as human-readable string.""" + if b >= 1024 ** 3: + return f"{b / 1024 ** 3:.2f} GiB" + elif b >= 1024 ** 2: + return f"{b / 1024 ** 2:.2f} MiB" + else: + return f"{b / 1024:.2f} KiB" + + +class ShuffleSizeBenchmark(Benchmark): + """ + Benchmark that measures shuffle write bytes on disk. + + Runs a simple scan -> repartition -> count pipeline and reports + the actual shuffle data file sizes alongside the Spark REST API + metric. Useful for comparing shuffle format overhead between + Spark and Comet. + + NOTE: The Spark session must be configured with spark.local.dir + pointing to a dedicated empty directory so that we can measure + shuffle file sizes accurately. The run_shuffle_size_benchmark.sh + script handles this automatically. + """ + + def __init__(self, spark, data_path: str, mode: str, + num_partitions: int = 200): + super().__init__(spark, data_path, mode) + self.num_partitions = num_partitions + + @classmethod + def name(cls) -> str: + return "shuffle-size" + + @classmethod + def description(cls) -> str: + return "Measure shuffle write bytes (scan -> repartition -> count)" + + def run(self) -> Dict[str, Any]: + df = self.spark.read.parquet(self.data_path) + row_count = df.count() + print(f"Input rows: {row_count:,}") + + schema_desc = ", ".join( + f"{f.name}: {f.dataType.simpleString()}" for f in df.schema.fields + ) + print(f"Schema: {schema_desc}") + + # Read spark.local.dir so we can measure shuffle files on disk + local_dir = self.spark.sparkContext.getConf().get( + "spark.local.dir", "/tmp" + ) + + output_path = ( + f"/tmp/shuffle-size-benchmark-output-{self.mode}" + ) + + def benchmark_operation(): + df.repartition(self.num_partitions).write.mode( + "overwrite" + ).parquet(output_path) + + duration_ms = self._time_operation(benchmark_operation) + + # Measure actual shuffle file sizes on disk. + # Shuffle .data files persist until SparkContext shutdown, + # so they are still available after the job completes. + disk_bytes = get_shuffle_disk_bytes(local_dir) + + # Also grab the REST API metric for comparison + api_bytes = 0 + try: + api_bytes = get_shuffle_write_bytes(self.spark) + except Exception as e: + print(f"Warning: could not read shuffle metrics from REST API: {e}") + + disk_bpr = disk_bytes / row_count if row_count > 0 else 0 + api_bpr = api_bytes / row_count if row_count > 0 else 0 + + print(f"Shuffle disk: {format_bytes(disk_bytes)} " + f"({disk_bpr:.1f} B/record)") + print(f"Shuffle API metric: {format_bytes(api_bytes)} " + f"({api_bpr:.1f} B/record)") + + return { + "duration_ms": duration_ms, + "row_count": row_count, + "num_partitions": self.num_partitions, + "shuffle_disk_bytes": disk_bytes, + "shuffle_disk_bytes_per_record": round(disk_bpr, 1), + "shuffle_api_bytes": api_bytes, + "shuffle_api_bytes_per_record": round(api_bpr, 1), + } diff --git a/benchmarks/pyspark/generate_data.py b/benchmarks/pyspark/generate_data.py index d8be47d6e0..812f359740 100755 --- a/benchmarks/pyspark/generate_data.py +++ b/benchmarks/pyspark/generate_data.py @@ -412,6 +412,53 @@ def generate_data(output_path: str, num_rows: int, num_partitions: int): spark.stop() +def generate_short_strings_data(output_path: str, num_rows: int, + num_partitions: int): + """Generate data matching the schema from issue #3882. + + Reproduces the problematic scenario: 7 short unique string columns + 1 + timestamp column. The original reporter saw 3x shuffle overhead with + 204M records of this shape (25.1 B/record in Comet vs 8.3 B/record in + Spark). + """ + + spark = SparkSession.builder \ + .appName("ShuffleBenchmark-DataGen-ShortStrings") \ + .getOrCreate() + + print(f"Generating {num_rows:,} rows with {num_partitions} partitions") + print(f"Output path: {output_path}") + print("Schema: 7 short unique string columns + 1 timestamp (issue #3882)") + + df = spark.range(0, num_rows, numPartitions=num_partitions) + + # 7 short random string columns + 1 timestamp, mimicking the reporter's + # schema. Uses uuid() to generate truly random strings that defeat + # compression, exposing Arrow IPC per-batch overhead. + df = df.selectExpr( + "substring(uuid(), 1, 8) as str_col_1", + "substring(uuid(), 1, 8) as str_col_2", + "substring(uuid(), 1, 8) as str_col_3", + "substring(uuid(), 1, 8) as str_col_4", + "substring(uuid(), 1, 8) as str_col_5", + "substring(uuid(), 1, 8) as str_col_6", + "substring(uuid(), 1, 8) as str_col_7", + # Timestamp column + "timestamp_seconds(1600000000 + id) as ts_col", + ) + + print(f"Generated schema with {len(df.columns)} columns") + df.printSchema() + + df.write.mode("overwrite").parquet(output_path) + + written_df = spark.read.parquet(output_path) + actual_count = written_df.count() + print(f"Wrote {actual_count:,} rows to {output_path}") + + spark.stop() + + def main(): parser = argparse.ArgumentParser( description="Generate test data for shuffle benchmark" @@ -433,13 +480,24 @@ def main(): default=None, help="Number of output partitions (default: auto based on cluster)" ) + parser.add_argument( + "--schema", "-s", + choices=["wide", "short-strings"], + default="wide", + help="Schema to generate: 'wide' (100 columns with nested types) " + "or 'short-strings' (7 short unique strings + 1 timestamp, " + "matches issue #3882)" + ) args = parser.parse_args() # Default partitions to a reasonable number if not specified num_partitions = args.partitions if args.partitions else 200 - generate_data(args.output, args.rows, num_partitions) + if args.schema == "short-strings": + generate_short_strings_data(args.output, args.rows, num_partitions) + else: + generate_data(args.output, args.rows, num_partitions) if __name__ == "__main__": diff --git a/benchmarks/pyspark/run_shuffle_size_benchmark.sh b/benchmarks/pyspark/run_shuffle_size_benchmark.sh new file mode 100755 index 0000000000..e55d219faa --- /dev/null +++ b/benchmarks/pyspark/run_shuffle_size_benchmark.sh @@ -0,0 +1,123 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Compare shuffle write sizes between Spark and Comet shuffle. +# +# This benchmark measures actual shuffle write bytes reported by Spark +# to quantify the overhead of Comet's Arrow IPC shuffle format. +# See https://github.com/apache/datafusion-comet/issues/3882 +# +# Prerequisites: +# - SPARK_HOME set to a Spark 3.5 installation +# - Comet JAR built (make) +# - Input parquet data generated (see generate_data.py) +# +# Usage: +# ./run_shuffle_size_benchmark.sh /path/to/parquet/data +# +# Environment variables: +# COMET_JAR Path to Comet JAR (default: auto-detected from repo) +# SPARK_MASTER Spark master URL (default: local[*]) +# EXECUTOR_MEMORY Executor memory (default: 16g) +# OFFHEAP_SIZE Off-heap memory for Comet (default: 16g) + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DATA_PATH="${1:?Usage: $0 /path/to/parquet/data}" +COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../../spark/target/comet-spark-spark3.5_2.12-0.15.0-SNAPSHOT.jar}" +SPARK_MASTER="${SPARK_MASTER:-local[*]}" +EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-16g}" +OFFHEAP_SIZE="${OFFHEAP_SIZE:-16g}" + +if [ -z "$SPARK_HOME" ]; then + echo "Error: SPARK_HOME is not set" + exit 1 +fi + +if [ ! -f "$COMET_JAR" ]; then + echo "Error: Comet JAR not found at $COMET_JAR" + echo "Build with 'make' or set COMET_JAR to the correct path." + exit 1 +fi + +echo "========================================" +echo "Shuffle Size Comparison Benchmark" +echo "========================================" +echo "Data path: $DATA_PATH" +echo "Comet JAR: $COMET_JAR" +echo "Spark master: $SPARK_MASTER" +echo "Executor memory: $EXECUTOR_MEMORY" +echo "Off-heap size: $OFFHEAP_SIZE" +echo "========================================" + +# Use dedicated local dirs so we can measure actual shuffle file sizes on disk +SPARK_LOCAL_DIR=$(mktemp -d /tmp/spark-shuffle-bench-spark-XXXXXX) +COMET_LOCAL_DIR=$(mktemp -d /tmp/spark-shuffle-bench-comet-XXXXXX) + +cleanup() { + rm -rf "$SPARK_LOCAL_DIR" "$COMET_LOCAL_DIR" +} +trap cleanup EXIT + +# Run Spark baseline (no Comet) +echo "" +echo ">>> Running SPARK (no Comet) shuffle size benchmark..." +$SPARK_HOME/bin/spark-submit \ + --master "$SPARK_MASTER" \ + --driver-memory "$EXECUTOR_MEMORY" \ + --executor-memory "$EXECUTOR_MEMORY" \ + --conf spark.local.dir="$SPARK_LOCAL_DIR" \ + --conf spark.comet.enabled=false \ + "$SCRIPT_DIR/run_benchmark.py" \ + --data "$DATA_PATH" \ + --mode spark \ + --benchmark shuffle-size + +# Run Comet Native shuffle +echo "" +echo ">>> Running COMET NATIVE shuffle size benchmark..." +$SPARK_HOME/bin/spark-submit \ + --master "$SPARK_MASTER" \ + --driver-memory "$EXECUTOR_MEMORY" \ + --executor-memory "$EXECUTOR_MEMORY" \ + --jars "$COMET_JAR" \ + --driver-class-path "$COMET_JAR" \ + --conf spark.executor.extraClassPath="$COMET_JAR" \ + --conf spark.local.dir="$COMET_LOCAL_DIR" \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size="$OFFHEAP_SIZE" \ + --conf spark.comet.enabled=true \ + --conf spark.comet.exec.shuffle.enabled=true \ + --conf spark.comet.exec.shuffle.mode=native \ + --conf spark.comet.explainFallback.enabled=true \ + "$SCRIPT_DIR/run_benchmark.py" \ + --data "$DATA_PATH" \ + --mode native \ + --benchmark shuffle-size + +echo "" +echo "========================================" +echo "BENCHMARK COMPLETE" +echo "========================================" +echo "Compare 'Shuffle disk' bytes/record between the two runs above."