Skip to content

[SPARK-55885][SQL] Optimize vectorized Parquet boolean reading with lookup-table expansion and batch buffer reads#54818

Closed
LuciferYang wants to merge 3 commits intoapache:masterfrom
LuciferYang:SPARK-55885
Closed

[SPARK-55885][SQL] Optimize vectorized Parquet boolean reading with lookup-table expansion and batch buffer reads#54818
LuciferYang wants to merge 3 commits intoapache:masterfrom
LuciferYang:SPARK-55885

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Mar 16, 2026

What changes were proposed in this pull request?

This PR optimizes the vectorized Parquet plain boolean reading path in two ways:

  1. Lookup-table-based bit expansion: Replace 8 individual byte writes per packed boolean byte with a single 64-bit Platform.putLong write, using a precomputed 256-entry lookup table (BOOL_BYTE_TO_LONG_TABLE) that expands each bit into a separate byte within a long. Big-endian platforms are handled via Long.reverseBytes().

  2. Batch buffer reads: Replace per-byte in.read() calls in VectorizedPlainValuesReader.readBooleans with a single getBuffer(fullBytes) call (backed by ByteBufferInputStream.slice), reducing I/O overhead from ~N/8 individual read calls to one bulk acquisition per batch.

Why are the changes needed?

In the current implementation, reading a batch of N boolean values from Parquet requires:

  • N/8 individual in.read() calls (each going through ByteBufferInputStream)
  • 8 individual byte writes per packed byte (either array stores or Platform.putByte calls)

For a typical batch size of 4096, this means ~512 in.read() calls and ~4096 individual byte writes. The optimized path reduces this to:

  • 1 getBuffer() call for the entire batch
  • ~512 single Platform.putLong writes (one per packed byte)

This is a meaningful improvement on the hot path of Parquet boolean column scanning.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Add Some new tests in ColumnVectorSuite
  • A micro-benchmark that simulates changes with JMH to verify the performance improvement.
Benchmark Code (click to expand)
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.parquet.bytes.ByteBufferInputStream;

import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/**
 * JMH benchmark comparing old (baseline) vs new (optimized) VectorizedPlainValuesReader.
 *
 * Run via:
 *   ./build/sbt "sql/Test/runMain \
 *     org.apache.spark.sql.execution.datasources.parquet \
 *     .VectorizedPlainValuesReaderBenchmark"
 */
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 10, time = 2)
@Measurement(iterations = 100, time = 2)
@Fork(value = 2, jvmArgsAppend = {"-Xmx4g", "-Xms4g"})
@State(Scope.Thread)
public class VectorizedPlainValuesReaderBenchmark {

  private static final int BATCH_SIZE = 4096;

  private Random random;

  // --- Heap buffer data for each type ---
  private byte[] booleanData;

  // --- Column vectors ---
  private WritableColumnVector booleanColumn;

  @Param({"OnHeap", "OffHeap"})
  public String vectorType;

  // --- Readers ---
  private OldVectorizedPlainValuesReader oldReader;
  private VectorizedPlainValuesReader newReader;

  @Setup(Level.Trial)
  public void setup() {
    random = new Random(42);

    // Boolean data: packed bits, ceil(BATCH_SIZE / 8) bytes
    int booleanBytes = (BATCH_SIZE + 7) / 8;
    booleanData = new byte[booleanBytes];
    random.nextBytes(booleanData);

    // Allocate column vectors
    if ("OffHeap".equalsIgnoreCase(vectorType)) {
      booleanColumn = new OffHeapColumnVector(BATCH_SIZE, DataTypes.BooleanType);
    } else {
      booleanColumn = new OnHeapColumnVector(BATCH_SIZE, DataTypes.BooleanType);
    }
  }

  @TearDown(Level.Trial)
  public void tearDown() {
    booleanColumn.close();
  }

  @Setup(Level.Invocation)
  public void prepareInvocation() throws IOException {
    booleanColumn.reset();
    oldReader = initOld(wrapHeap(booleanData));
    newReader = initNew(wrapHeap(booleanData));
  }

  // --- Helper methods ---

  private ByteBufferInputStream wrapHeap(byte[] data) {
    ByteBuffer buf = ByteBuffer.wrap(data);
    return ByteBufferInputStream.wrap(buf);
  }

  private OldVectorizedPlainValuesReader initOld(ByteBufferInputStream stream) throws IOException {
    OldVectorizedPlainValuesReader reader = new OldVectorizedPlainValuesReader();
    reader.initFromPage(BATCH_SIZE, stream);
    return reader;
  }

  private VectorizedPlainValuesReader initNew(ByteBufferInputStream stream) throws IOException {
    VectorizedPlainValuesReader reader = new VectorizedPlainValuesReader();
    reader.initFromPage(BATCH_SIZE, stream);
    return reader;
  }

  // ==================== Boolean benchmarks ====================

  @Benchmark
  public void readBooleans_old() throws IOException {
    oldReader.readBooleans(BATCH_SIZE, booleanColumn, 0);
  }

  @Benchmark
  public void readBooleans_new() throws IOException {
    newReader.readBooleans(BATCH_SIZE, booleanColumn, 0);
  }

  // ==================== Main entry point ====================

  public static void main(String[] args) throws Exception {
    Options opt = new OptionsBuilder()
        .include(VectorizedPlainValuesReaderBenchmark.class.getSimpleName())
        .build();
    new Runner(opt).run();
  }
}

Perform build/sbt "sql/Test/runMain org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderBenchmark" to conduct the test

Benchmark results:

  • Java 17.0.18
[info] Benchmark                                              (vectorType)  Mode  Cnt     Score    Error  Units
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_new        OnHeap  avgt  200   280.158 ± 10.135  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_new       OffHeap  avgt  200   264.486 ±  5.720  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_old        OnHeap  avgt  200   999.699 ±  0.719  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_old       OffHeap  avgt  200  1152.287 ±  0.407  ns/op
  • Java 21.0.10
[info] Benchmark                                              (vectorType)  Mode  Cnt    Score   Error  Units
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_new        OnHeap  avgt  200  284.184 ± 0.557  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_new       OffHeap  avgt  200  262.629 ± 5.199  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_old        OnHeap  avgt  200  979.432 ± 0.708  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_old       OffHeap  avgt  200  991.920 ± 1.109  ns/op
  • Java 25
[info] Benchmark                                              (vectorType)  Mode  Cnt     Score   Error  Units
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_new        OnHeap  avgt  200   300.721 ± 0.361  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_new       OffHeap  avgt  200   326.190 ± 1.530  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_old        OnHeap  avgt  200   993.619 ± 1.023  ns/op
[info] VectorizedPlainValuesReaderBenchmark.readBooleans_old       OffHeap  avgt  200  1153.824 ± 1.005  ns/op

From the perspective of micro-benchmarking, the new solution demonstrates over a 3-fold performance improvement in terms of latency.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

@LuciferYang LuciferYang marked this pull request as draft March 16, 2026 06:13
@LuciferYang
Copy link
Contributor Author

test first and after supplementing the benchmark results, the code unrelated to pr will be revert

This reverts commit 35dd829.
@LuciferYang LuciferYang marked this pull request as ready for review March 16, 2026 16:37
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @LuciferYang .

@dongjoon-hyun
Copy link
Member

BTW, @LuciferYang . Although I understand why you used the term, Assisted, please follow the Apache Spark Pull Request Template which is based on ASF Guidance. The community guidance need to use Generated-by: clause.

If generative AI tooling has been used in the process of authoring this patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.

https://www.apache.org/legal/generative-tooling.html

@dongjoon-hyun
Copy link
Member

Anyway, merged to master~

@LuciferYang
Copy link
Contributor Author

BTW, @LuciferYang . Although I understand why you used the term, Assisted, please follow the Apache Spark Pull Request Template which is based on ASF Guidance. The community guidance need to use Generated-by: clause.

If generative AI tooling has been used in the process of authoring this patch, please include the
phrase: 'Generated-by: ' followed by the name of the tool and its version.

https://www.apache.org/legal/generative-tooling.html

Thank you for your correction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants