Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.predicate.And;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.GreaterOrEqual;
import org.apache.paimon.predicate.GreaterThan;
import org.apache.paimon.predicate.InPredicateVisitor;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -55,11 +66,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER;
Expand Down Expand Up @@ -137,7 +152,7 @@ private static class ManifestsScan extends ReadOnceTableScan {

@Override
public InnerTableScan withFilter(Predicate predicate) {
// TODO
// filter is handled in ManifestsRead
return this;
}

Expand Down Expand Up @@ -169,20 +184,80 @@ public OptionalLong mergedRowCount() {

private static class ManifestsRead implements InnerTableRead {

private static final String LEAF_NAME = "schema_id";

private RowType readType;

private final FileStoreTable dataTable;

private @Nullable Long schemaIdMin = null;
private @Nullable Long schemaIdMax = null;
private final List<Long> schemaIds = new ArrayList<>();

public ManifestsRead(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
if (predicate == null) {
return this;
}

if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
if ((compoundPredicate.function()) instanceof And) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
handleLeafPredicate(leaf, LEAF_NAME);
}
}

// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
InPredicateVisitor.extractInElements(predicate, LEAF_NAME)
.ifPresent(
leafs ->
leafs.forEach(
leaf ->
schemaIds.add(
Long.parseLong(
leaf.toString()))));
}
} else {
handleLeafPredicate(predicate, LEAF_NAME);
}

return this;
}

private void handleLeafPredicate(Predicate predicate, String leafName) {
LeafPredicate schemaPred =
predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName);
if (schemaPred != null) {
if (schemaPred.function() instanceof Equal) {
schemaIdMin = (Long) schemaPred.literals().get(0);
schemaIdMax = (Long) schemaPred.literals().get(0);
}

if (schemaPred.function() instanceof GreaterThan) {
schemaIdMin = (Long) schemaPred.literals().get(0) + 1;
}

if (schemaPred.function() instanceof GreaterOrEqual) {
schemaIdMin = (Long) schemaPred.literals().get(0);
}

if (schemaPred.function() instanceof LessThan) {
schemaIdMax = (Long) schemaPred.literals().get(0) - 1;
}

if (schemaPred.function() instanceof LessOrEqual) {
schemaIdMax = (Long) schemaPred.literals().get(0);
}
}
}

@Override
public InnerTableRead withReadType(RowType readType) {
this.readType = readType;
Expand All @@ -201,6 +276,17 @@ public RecordReader<InternalRow> createReader(Split split) {
}
List<ManifestFileMeta> manifestFileMetas = allManifests(dataTable);

// Apply schema_id filter
if (!schemaIds.isEmpty()) {
manifestFileMetas = filterBySchemaIds(manifestFileMetas, schemaIds);
} else if (schemaIdMin != null || schemaIdMax != null) {
manifestFileMetas =
filterBySchemaIdRange(
manifestFileMetas,
Optional.ofNullable(schemaIdMin),
Optional.ofNullable(schemaIdMax));
}

@SuppressWarnings("unchecked")
CastExecutor<InternalRow, BinaryString> partitionCastExecutor =
(CastExecutor<InternalRow, BinaryString>)
Expand All @@ -222,6 +308,33 @@ public RecordReader<InternalRow> createReader(Split split) {
return new IteratorRecordReader<>(rows);
}

private static List<ManifestFileMeta> filterBySchemaIds(
List<ManifestFileMeta> metas, List<Long> schemaIds) {
List<ManifestFileMeta> result = new ArrayList<>();
for (ManifestFileMeta meta : metas) {
if (schemaIds.contains(meta.schemaId())) {
result.add(meta);
}
}
return result;
}

private static List<ManifestFileMeta> filterBySchemaIdRange(
List<ManifestFileMeta> metas, Optional<Long> min, Optional<Long> max) {
List<ManifestFileMeta> result = new ArrayList<>();
for (ManifestFileMeta meta : metas) {
long schemaId = meta.schemaId();
if (min.isPresent() && schemaId < min.get()) {
continue;
}
if (max.isPresent() && schemaId > max.get()) {
continue;
}
result.add(meta);
}
return result;
}

private InternalRow toRow(
ManifestFileMeta manifestFileMeta,
CastExecutor<InternalRow, BinaryString> partitionCastExecutor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand All @@ -33,17 +34,22 @@
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -164,6 +170,53 @@ public void testReadManifestsFromNotExistSnapshot() {
"Specified parameter scan.snapshot-id = 3 is not exist, you can set it in range from 1 to 2");
}

@Test
public void testFilterBySchemaIdEqual() throws Exception {
List<InternalRow> expectedRow = getExpectedResult(2L);
List<InternalRow> result = readWithFilter(manifestsTable, schemaIdEqual(0L));
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testFilterBySchemaIdEqualNoMatch() throws Exception {
List<InternalRow> result = readWithFilter(manifestsTable, schemaIdEqual(999L));
assertThat(result).isEmpty();
}

@Test
public void testFilterBySchemaIdRange() throws Exception {
PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE);
Predicate predicate =
PredicateBuilder.and(builder.greaterOrEqual(4, 0L), builder.lessOrEqual(4, 0L));
List<InternalRow> expectedRow = getExpectedResult(2L);
List<InternalRow> result = readWithFilter(manifestsTable, predicate);
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testFilterBySchemaIdGreaterThan() throws Exception {
PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE);
List<InternalRow> result = readWithFilter(manifestsTable, builder.greaterThan(4, 0L));
assertThat(result).isEmpty();
}

@Test
public void testFilterBySchemaIdIn() throws Exception {
PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE);
Predicate predicate = builder.in(4, Arrays.asList(0L, 1L));
List<InternalRow> expectedRow = getExpectedResult(2L);
List<InternalRow> result = readWithFilter(manifestsTable, predicate);
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testFilterBySchemaIdInNoMatch() throws Exception {
PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE);
Predicate predicate = builder.in(4, Arrays.asList(998L, 999L));
List<InternalRow> result = readWithFilter(manifestsTable, predicate);
assertThat(result).isEmpty();
}

@Test
void testManifestCreationTimeTimestamp() throws Exception {
Identifier identifier = identifier("T_CreationTime");
Expand Down Expand Up @@ -243,4 +296,19 @@ private List<InternalRow> getExpectedResult(long snapshotId) {
}
return expectedRow;
}

private Predicate schemaIdEqual(long schemaId) {
PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE);
return builder.equal(4, schemaId);
}

private List<InternalRow> readWithFilter(Table table, Predicate predicate) throws Exception {
ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
InternalRowSerializer serializer = new InternalRowSerializer(table.rowType());
List<InternalRow> rows = new ArrayList<>();
reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
return rows;
}
}
Loading