Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions benchmarks/pyspark/benchmarks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@

from .base import Benchmark
from .shuffle import ShuffleHashBenchmark, ShuffleRoundRobinBenchmark
from .shuffle_size import ShuffleSizeBenchmark


# Registry of all available benchmarks
_BENCHMARK_REGISTRY: Dict[str, Type[Benchmark]] = {
ShuffleHashBenchmark.name(): ShuffleHashBenchmark,
ShuffleRoundRobinBenchmark.name(): ShuffleRoundRobinBenchmark,
ShuffleSizeBenchmark.name(): ShuffleSizeBenchmark,
}


Expand Down Expand Up @@ -76,4 +78,5 @@ def list_benchmarks() -> List[tuple[str, str]]:
'list_benchmarks',
'ShuffleHashBenchmark',
'ShuffleRoundRobinBenchmark',
'ShuffleSizeBenchmark',
]
152 changes: 152 additions & 0 deletions benchmarks/pyspark/benchmarks/shuffle_size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Shuffle size benchmark for measuring shuffle write bytes.

Measures the actual shuffle file sizes on disk to compare
shuffle file sizes between Spark and Comet shuffle implementations.
This is useful for investigating shuffle format overhead (see issue #3882).

The benchmark sets spark.local.dir to a dedicated temp directory and
measures the total size of shuffle data files (.data) written there.
"""

import json
import os
import urllib.request
from typing import Dict, Any

from pyspark.sql import DataFrame

from .base import Benchmark


def get_shuffle_write_bytes(spark) -> int:
"""Get total shuffle write bytes from the Spark REST API."""
sc = spark.sparkContext
ui_url = sc.uiWebUrl
url = f"{ui_url}/api/v1/applications/{sc.applicationId}/stages"
with urllib.request.urlopen(url) as resp:
stages = json.loads(resp.read())
return sum(s.get("shuffleWriteBytes", 0) for s in stages)


def get_shuffle_disk_bytes(local_dir: str) -> int:
"""Walk spark.local.dir and sum the sizes of all shuffle .data files."""
total = 0
for root, _dirs, files in os.walk(local_dir):
for f in files:
if f.endswith(".data"):
total += os.path.getsize(os.path.join(root, f))
return total


def format_bytes(b: int) -> str:
"""Format byte count as human-readable string."""
if b >= 1024 ** 3:
return f"{b / 1024 ** 3:.2f} GiB"
elif b >= 1024 ** 2:
return f"{b / 1024 ** 2:.2f} MiB"
else:
return f"{b / 1024:.2f} KiB"


class ShuffleSizeBenchmark(Benchmark):
"""
Benchmark that measures shuffle write bytes on disk.

Runs a simple scan -> repartition -> count pipeline and reports
the actual shuffle data file sizes alongside the Spark REST API
metric. Useful for comparing shuffle format overhead between
Spark and Comet.

NOTE: The Spark session must be configured with spark.local.dir
pointing to a dedicated empty directory so that we can measure
shuffle file sizes accurately. The run_shuffle_size_benchmark.sh
script handles this automatically.
"""

def __init__(self, spark, data_path: str, mode: str,
num_partitions: int = 200):
super().__init__(spark, data_path, mode)
self.num_partitions = num_partitions

@classmethod
def name(cls) -> str:
return "shuffle-size"

@classmethod
def description(cls) -> str:
return "Measure shuffle write bytes (scan -> repartition -> count)"

def run(self) -> Dict[str, Any]:
df = self.spark.read.parquet(self.data_path)
row_count = df.count()
print(f"Input rows: {row_count:,}")

schema_desc = ", ".join(
f"{f.name}: {f.dataType.simpleString()}" for f in df.schema.fields
)
print(f"Schema: {schema_desc}")

# Read spark.local.dir so we can measure shuffle files on disk
local_dir = self.spark.sparkContext.getConf().get(
"spark.local.dir", "/tmp"
)

output_path = (
f"/tmp/shuffle-size-benchmark-output-{self.mode}"
)

def benchmark_operation():
df.repartition(self.num_partitions).write.mode(
"overwrite"
).parquet(output_path)

duration_ms = self._time_operation(benchmark_operation)

# Measure actual shuffle file sizes on disk.
# Shuffle .data files persist until SparkContext shutdown,
# so they are still available after the job completes.
disk_bytes = get_shuffle_disk_bytes(local_dir)

# Also grab the REST API metric for comparison
api_bytes = 0
try:
api_bytes = get_shuffle_write_bytes(self.spark)
except Exception as e:
print(f"Warning: could not read shuffle metrics from REST API: {e}")

disk_bpr = disk_bytes / row_count if row_count > 0 else 0
api_bpr = api_bytes / row_count if row_count > 0 else 0

print(f"Shuffle disk: {format_bytes(disk_bytes)} "
f"({disk_bpr:.1f} B/record)")
print(f"Shuffle API metric: {format_bytes(api_bytes)} "
f"({api_bpr:.1f} B/record)")

return {
"duration_ms": duration_ms,
"row_count": row_count,
"num_partitions": self.num_partitions,
"shuffle_disk_bytes": disk_bytes,
"shuffle_disk_bytes_per_record": round(disk_bpr, 1),
"shuffle_api_bytes": api_bytes,
"shuffle_api_bytes_per_record": round(api_bpr, 1),
}
60 changes: 59 additions & 1 deletion benchmarks/pyspark/generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,53 @@ def generate_data(output_path: str, num_rows: int, num_partitions: int):
spark.stop()


def generate_short_strings_data(output_path: str, num_rows: int,
num_partitions: int):
"""Generate data matching the schema from issue #3882.

Reproduces the problematic scenario: 7 short unique string columns + 1
timestamp column. The original reporter saw 3x shuffle overhead with
204M records of this shape (25.1 B/record in Comet vs 8.3 B/record in
Spark).
"""

spark = SparkSession.builder \
.appName("ShuffleBenchmark-DataGen-ShortStrings") \
.getOrCreate()

print(f"Generating {num_rows:,} rows with {num_partitions} partitions")
print(f"Output path: {output_path}")
print("Schema: 7 short unique string columns + 1 timestamp (issue #3882)")

df = spark.range(0, num_rows, numPartitions=num_partitions)

# 7 short random string columns + 1 timestamp, mimicking the reporter's
# schema. Uses uuid() to generate truly random strings that defeat
# compression, exposing Arrow IPC per-batch overhead.
df = df.selectExpr(
"substring(uuid(), 1, 8) as str_col_1",
"substring(uuid(), 1, 8) as str_col_2",
"substring(uuid(), 1, 8) as str_col_3",
"substring(uuid(), 1, 8) as str_col_4",
"substring(uuid(), 1, 8) as str_col_5",
"substring(uuid(), 1, 8) as str_col_6",
"substring(uuid(), 1, 8) as str_col_7",
# Timestamp column
"timestamp_seconds(1600000000 + id) as ts_col",
)

print(f"Generated schema with {len(df.columns)} columns")
df.printSchema()

df.write.mode("overwrite").parquet(output_path)

written_df = spark.read.parquet(output_path)
actual_count = written_df.count()
print(f"Wrote {actual_count:,} rows to {output_path}")

spark.stop()


def main():
parser = argparse.ArgumentParser(
description="Generate test data for shuffle benchmark"
Expand All @@ -433,13 +480,24 @@ def main():
default=None,
help="Number of output partitions (default: auto based on cluster)"
)
parser.add_argument(
"--schema", "-s",
choices=["wide", "short-strings"],
default="wide",
help="Schema to generate: 'wide' (100 columns with nested types) "
"or 'short-strings' (7 short unique strings + 1 timestamp, "
"matches issue #3882)"
)

args = parser.parse_args()

# Default partitions to a reasonable number if not specified
num_partitions = args.partitions if args.partitions else 200

generate_data(args.output, args.rows, num_partitions)
if args.schema == "short-strings":
generate_short_strings_data(args.output, args.rows, num_partitions)
else:
generate_data(args.output, args.rows, num_partitions)


if __name__ == "__main__":
Expand Down
123 changes: 123 additions & 0 deletions benchmarks/pyspark/run_shuffle_size_benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Compare shuffle write sizes between Spark and Comet shuffle.
#
# This benchmark measures actual shuffle write bytes reported by Spark
# to quantify the overhead of Comet's Arrow IPC shuffle format.
# See https://github.com/apache/datafusion-comet/issues/3882
#
# Prerequisites:
# - SPARK_HOME set to a Spark 3.5 installation
# - Comet JAR built (make)
# - Input parquet data generated (see generate_data.py)
#
# Usage:
# ./run_shuffle_size_benchmark.sh /path/to/parquet/data
#
# Environment variables:
# COMET_JAR Path to Comet JAR (default: auto-detected from repo)
# SPARK_MASTER Spark master URL (default: local[*])
# EXECUTOR_MEMORY Executor memory (default: 16g)
# OFFHEAP_SIZE Off-heap memory for Comet (default: 16g)

set -e

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
DATA_PATH="${1:?Usage: $0 /path/to/parquet/data}"
COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../../spark/target/comet-spark-spark3.5_2.12-0.15.0-SNAPSHOT.jar}"
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-16g}"
OFFHEAP_SIZE="${OFFHEAP_SIZE:-16g}"

if [ -z "$SPARK_HOME" ]; then
echo "Error: SPARK_HOME is not set"
exit 1
fi

if [ ! -f "$COMET_JAR" ]; then
echo "Error: Comet JAR not found at $COMET_JAR"
echo "Build with 'make' or set COMET_JAR to the correct path."
exit 1
fi

echo "========================================"
echo "Shuffle Size Comparison Benchmark"
echo "========================================"
echo "Data path: $DATA_PATH"
echo "Comet JAR: $COMET_JAR"
echo "Spark master: $SPARK_MASTER"
echo "Executor memory: $EXECUTOR_MEMORY"
echo "Off-heap size: $OFFHEAP_SIZE"
echo "========================================"

# Use dedicated local dirs so we can measure actual shuffle file sizes on disk
SPARK_LOCAL_DIR=$(mktemp -d /tmp/spark-shuffle-bench-spark-XXXXXX)
COMET_LOCAL_DIR=$(mktemp -d /tmp/spark-shuffle-bench-comet-XXXXXX)

cleanup() {
rm -rf "$SPARK_LOCAL_DIR" "$COMET_LOCAL_DIR"
}
trap cleanup EXIT

# Run Spark baseline (no Comet)
echo ""
echo ">>> Running SPARK (no Comet) shuffle size benchmark..."
$SPARK_HOME/bin/spark-submit \
--master "$SPARK_MASTER" \
--driver-memory "$EXECUTOR_MEMORY" \
--executor-memory "$EXECUTOR_MEMORY" \
--conf spark.local.dir="$SPARK_LOCAL_DIR" \
--conf spark.comet.enabled=false \
"$SCRIPT_DIR/run_benchmark.py" \
--data "$DATA_PATH" \
--mode spark \
--benchmark shuffle-size

# Run Comet Native shuffle
echo ""
echo ">>> Running COMET NATIVE shuffle size benchmark..."
$SPARK_HOME/bin/spark-submit \
--master "$SPARK_MASTER" \
--driver-memory "$EXECUTOR_MEMORY" \
--executor-memory "$EXECUTOR_MEMORY" \
--jars "$COMET_JAR" \
--driver-class-path "$COMET_JAR" \
--conf spark.executor.extraClassPath="$COMET_JAR" \
--conf spark.local.dir="$COMET_LOCAL_DIR" \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size="$OFFHEAP_SIZE" \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=native \
--conf spark.comet.explainFallback.enabled=true \
"$SCRIPT_DIR/run_benchmark.py" \
--data "$DATA_PATH" \
--mode native \
--benchmark shuffle-size

echo ""
echo "========================================"
echo "BENCHMARK COMPLETE"
echo "========================================"
echo "Compare 'Shuffle disk' bytes/record between the two runs above."
Loading