Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.benchmarks;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.LocalInputFile;
import org.apache.parquet.io.LocalOutputFile;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;

import java.util.concurrent.TimeUnit;

/**
* JMH benchmarks for ALP (Adaptive Lossless floating-Point) encoding.
*
* <p>Compares ALP vs PLAIN encoding for float and double columns across write and read paths.
* Uses realistic floating-point data with limited decimal precision — the type of data ALP
* is designed to compress (e.g. sensor readings, prices, timestamps as doubles).
*
* <p>Run with:
* <pre>
* mvn package -pl parquet-benchmarks -am -DskipTests
* java -jar parquet-benchmarks/target/parquet-benchmarks.jar AlpEncodingBenchmarks
* </pre>
*/
@State(Scope.Thread)
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class AlpEncodingBenchmarks {

private static final int N_ROWS = 1_000_000;

private static final MessageType SCHEMA = MessageTypeParser.parseMessageType(
"message alp_bench { required double double_col; required float float_col; }");

// Pre-generated data arrays — allocated once, reused across iterations.
private static final double[] DOUBLES = new double[N_ROWS];
private static final float[] FLOATS = new float[N_ROWS];

static {
// Simulate sensor/metric data: values with 2-3 decimal digits of precision.
// ALP excels at this pattern; plain encoding stores all 8 bytes per double regardless.
for (int i = 0; i < N_ROWS; i++) {
DOUBLES[i] = Math.round((i * 0.01 + 100.0) * 100.0) / 100.0;
FLOATS[i] = (float) (Math.round((i * 0.01f + 10.0f) * 100.0f) / 100.0f);
}
}

// Files written once per trial (for read benchmarks).
private Path alpReadFile;
private Path plainReadFile;

// Files written fresh each iteration (for write benchmarks).
private Path writeTarget;

@Setup(Level.Trial)
public void generateReadFiles() throws IOException {
alpReadFile = Files.createTempFile("alp_bench_alp_", ".parquet");
Files.delete(alpReadFile); // LocalOutputFile must not pre-exist
writeParquetFile(alpReadFile, true);

plainReadFile = Files.createTempFile("alp_bench_plain_", ".parquet");
Files.delete(plainReadFile);
writeParquetFile(plainReadFile, false);
}

@TearDown(Level.Trial)
public void deleteReadFiles() throws IOException {
if (alpReadFile != null) Files.deleteIfExists(alpReadFile);
if (plainReadFile != null) Files.deleteIfExists(plainReadFile);
}

@Setup(Level.Iteration)
public void prepareWriteTarget() throws IOException {
writeTarget = Files.createTempFile("alp_bench_write_", ".parquet");
Files.delete(writeTarget); // LocalOutputFile must not pre-exist
}

@TearDown(Level.Iteration)
public void deleteWriteTarget() throws IOException {
if (writeTarget != null) Files.deleteIfExists(writeTarget);
}

// ---------------------------------------------------------------------------
// Write benchmarks
// ---------------------------------------------------------------------------

@Benchmark
public void writeDoubleAndFloatALP() throws IOException {
writeParquetFile(writeTarget, true);
}

@Benchmark
public void writeDoubleAndFloatPlain() throws IOException {
writeParquetFile(writeTarget, false);
}

// ---------------------------------------------------------------------------
// Read benchmarks
// ---------------------------------------------------------------------------

@Benchmark
public void readDoubleAndFloatALP(Blackhole bh) throws IOException {
readParquetFile(alpReadFile, bh);
}

@Benchmark
public void readDoubleAndFloatPlain(Blackhole bh) throws IOException {
readParquetFile(plainReadFile, bh);
}

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

private static void writeParquetFile(Path path, boolean alp) throws IOException {
try (org.apache.parquet.hadoop.ParquetWriter<Group> writer =
ExampleParquetWriter.builder(new LocalOutputFile(path))
.withType(SCHEMA)
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.withAlpEncoding(alp)
.withDictionaryEncoding(false)
.build()) {
for (int i = 0; i < N_ROWS; i++) {
SimpleGroup row = new SimpleGroup(SCHEMA);
row.add("double_col", DOUBLES[i]);
row.add("float_col", FLOATS[i]);
writer.write(row);
}
}
}

private static void readParquetFile(Path path, Blackhole bh) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(new LocalInputFile(path))) {
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(SCHEMA);
PageReadStore pages;
while ((pages = reader.readNextRowGroup()) != null) {
RecordReader<Group> records =
columnIO.getRecordReader(pages, new GroupRecordConverter(SCHEMA));
long rowCount = pages.getRowCount();
for (long i = 0; i < rowCount; i++) {
Group row = records.read();
bh.consume(row.getDouble("double_col", 0));
bh.consume(row.getFloat("float_col", 0));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble;
import org.apache.parquet.column.values.alp.AlpValuesReaderForFloat;
import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA;
Expand Down Expand Up @@ -147,6 +149,26 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
}
},

/**
* ALP (Adaptive Lossless floating-Point) encoding for FLOAT and DOUBLE types.
* Works by converting floating-point values to integers using decimal scaling,
* then applying Frame of Reference (FOR) encoding and bit-packing.
*/
ALP {
@Override
public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
switch (descriptor.getType()) {
case FLOAT:
return new AlpValuesReaderForFloat();
case DOUBLE:
return new AlpValuesReaderForDouble();
default:
throw new ParquetDecodingException(
"ALP encoding is only supported for FLOAT and DOUBLE, not " + descriptor.getType());
}
}
},

/**
* @deprecated This is no longer used, and has been replaced by {@link #RLE}
* which is combination of bit packing and rle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ParquetProperties {
public static final int DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
public static final boolean DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED = false;
public static final boolean DEFAULT_IS_ALP_ENABLED = false;
public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0;
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
Expand Down Expand Up @@ -132,6 +133,7 @@ public static WriterVersion fromString(String name) {
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
private final ColumnProperty<Boolean> alpEnabled;
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
private final ColumnProperty<Boolean> sizeStatistics;
Expand Down Expand Up @@ -164,6 +166,7 @@ private ParquetProperties(Builder builder) {
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
this.alpEnabled = builder.alpEnabled.build();
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
this.sizeStatistics = builder.sizeStatistics.build();
Expand Down Expand Up @@ -259,6 +262,23 @@ public boolean isByteStreamSplitEnabled(ColumnDescriptor column) {
}
}

/**
* Check if ALP encoding is enabled for the given column.
* ALP encoding is only supported for FLOAT and DOUBLE types.
*
* @param column the column descriptor
* @return true if ALP encoding is enabled for this column
*/
public boolean isAlpEnabled(ColumnDescriptor column) {
switch (column.getPrimitiveType().getPrimitiveTypeName()) {
case FLOAT:
case DOUBLE:
return alpEnabled.getValue(column);
default:
return false;
}
}

public ByteBufferAllocator getAllocator() {
return allocator;
}
Expand Down Expand Up @@ -388,7 +408,8 @@ public String toString() {
+ "Page row count limit to " + getPageRowCountLimit() + '\n'
+ "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off") + '\n'
+ "Statistics enabled: " + statisticsEnabled + '\n'
+ "Size statistics enabled: " + sizeStatisticsEnabled;
+ "Size statistics enabled: " + sizeStatisticsEnabled + '\n'
+ "ALP enabled: " + alpEnabled;
}

public static class Builder {
Expand Down Expand Up @@ -416,6 +437,7 @@ public static class Builder {
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
private final ColumnProperty.Builder<Boolean> alpEnabled;
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
private final ColumnProperty.Builder<Boolean> sizeStatistics;
Expand All @@ -427,6 +449,7 @@ private Builder() {
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED
? ByteStreamSplitMode.FLOATING_POINT
: ByteStreamSplitMode.NONE);
alpEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_ALP_ENABLED);
bloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
bloomFilterFPPs = ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
Expand Down Expand Up @@ -457,6 +480,7 @@ private Builder(ParquetProperties toCopy) {
this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates);
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled);
this.alpEnabled = ColumnProperty.builder(toCopy.alpEnabled);
this.extraMetaData = toCopy.extraMetaData;
this.statistics = ColumnProperty.builder(toCopy.statistics);
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
Expand Down Expand Up @@ -534,6 +558,29 @@ public Builder withExtendedByteStreamSplitEncoding(boolean enable) {
return this;
}

/**
* Enable or disable ALP encoding for FLOAT and DOUBLE columns.
*
* @param enable whether ALP encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withAlpEncoding(boolean enable) {
this.alpEnabled.withDefaultValue(enable);
return this;
}

/**
* Enable or disable ALP encoding for the specified column.
*
* @param columnPath the path of the column (dot-string)
* @param enable whether ALP encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withAlpEncoding(String columnPath, boolean enable) {
this.alpEnabled.withValue(columnPath, enable);
return this;
}

/**
* Set the Parquet format dictionary page size.
*
Expand Down
Loading