diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java index ea91154b7d36..7bc8f23b4259 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java @@ -30,6 +30,17 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.InPredicateVisitor; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.Or; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -55,11 +66,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalLong; import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; @@ -137,7 +152,7 @@ private static class ManifestsScan extends ReadOnceTableScan { @Override public InnerTableScan withFilter(Predicate predicate) { - // TODO + // filter is handled in ManifestsRead return this; } @@ -169,20 +184,80 @@ public OptionalLong mergedRowCount() { private static class ManifestsRead implements InnerTableRead { + private static final String LEAF_NAME = "schema_id"; + private RowType readType; private final FileStoreTable dataTable; + private @Nullable Long schemaIdMin = null; + private @Nullable Long schemaIdMax = null; + private final List schemaIds = new ArrayList<>(); + public ManifestsRead(FileStoreTable dataTable) { this.dataTable = dataTable; } @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + if (predicate == null) { + return this; + } + + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + if ((compoundPredicate.function()) instanceof And) { + List children = compoundPredicate.children(); + for (Predicate leaf : children) { + handleLeafPredicate(leaf, LEAF_NAME); + } + } + + // optimize for IN filter + if ((compoundPredicate.function()) instanceof Or) { + InPredicateVisitor.extractInElements(predicate, LEAF_NAME) + .ifPresent( + leafs -> + leafs.forEach( + leaf -> + schemaIds.add( + Long.parseLong( + leaf.toString())))); + } + } else { + handleLeafPredicate(predicate, LEAF_NAME); + } + return this; } + private void handleLeafPredicate(Predicate predicate, String leafName) { + LeafPredicate schemaPred = + predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName); + if (schemaPred != null) { + if (schemaPred.function() instanceof Equal) { + schemaIdMin = (Long) schemaPred.literals().get(0); + schemaIdMax = (Long) schemaPred.literals().get(0); + } + + if (schemaPred.function() instanceof GreaterThan) { + schemaIdMin = (Long) schemaPred.literals().get(0) + 1; + } + + if (schemaPred.function() instanceof GreaterOrEqual) { + schemaIdMin = (Long) schemaPred.literals().get(0); + } + + if (schemaPred.function() instanceof LessThan) { + schemaIdMax = (Long) schemaPred.literals().get(0) - 1; + } + + if (schemaPred.function() instanceof LessOrEqual) { + schemaIdMax = (Long) schemaPred.literals().get(0); + } + } + } + @Override public InnerTableRead withReadType(RowType readType) { this.readType = readType; @@ -201,6 +276,17 @@ public RecordReader createReader(Split split) { } List manifestFileMetas = allManifests(dataTable); + // Apply schema_id filter + if (!schemaIds.isEmpty()) { + manifestFileMetas = filterBySchemaIds(manifestFileMetas, schemaIds); + } else if (schemaIdMin != null || schemaIdMax != null) { + manifestFileMetas = + filterBySchemaIdRange( + manifestFileMetas, + Optional.ofNullable(schemaIdMin), + Optional.ofNullable(schemaIdMax)); + } + @SuppressWarnings("unchecked") CastExecutor partitionCastExecutor = (CastExecutor) @@ -222,6 +308,33 @@ public RecordReader createReader(Split split) { return new IteratorRecordReader<>(rows); } + private static List filterBySchemaIds( + List metas, List schemaIds) { + List result = new ArrayList<>(); + for (ManifestFileMeta meta : metas) { + if (schemaIds.contains(meta.schemaId())) { + result.add(meta); + } + } + return result; + } + + private static List filterBySchemaIdRange( + List metas, Optional min, Optional max) { + List result = new ArrayList<>(); + for (ManifestFileMeta meta : metas) { + long schemaId = meta.schemaId(); + if (min.isPresent() && schemaId < min.get()) { + continue; + } + if (max.isPresent() && schemaId > max.get()) { + continue; + } + result.add(meta); + } + return result; + } + private InternalRow toRow( ManifestFileMeta manifestFileMeta, CastExecutor partitionCastExecutor) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java index 3a4507c5b6dd..c8bf00e8130f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -33,10 +34,14 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.SnapshotManager; @@ -44,6 +49,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -164,6 +170,53 @@ public void testReadManifestsFromNotExistSnapshot() { "Specified parameter scan.snapshot-id = 3 is not exist, you can set it in range from 1 to 2"); } + @Test + public void testFilterBySchemaIdEqual() throws Exception { + List expectedRow = getExpectedResult(2L); + List result = readWithFilter(manifestsTable, schemaIdEqual(0L)); + assertThat(result).containsExactlyElementsOf(expectedRow); + } + + @Test + public void testFilterBySchemaIdEqualNoMatch() throws Exception { + List result = readWithFilter(manifestsTable, schemaIdEqual(999L)); + assertThat(result).isEmpty(); + } + + @Test + public void testFilterBySchemaIdRange() throws Exception { + PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE); + Predicate predicate = + PredicateBuilder.and(builder.greaterOrEqual(4, 0L), builder.lessOrEqual(4, 0L)); + List expectedRow = getExpectedResult(2L); + List result = readWithFilter(manifestsTable, predicate); + assertThat(result).containsExactlyElementsOf(expectedRow); + } + + @Test + public void testFilterBySchemaIdGreaterThan() throws Exception { + PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE); + List result = readWithFilter(manifestsTable, builder.greaterThan(4, 0L)); + assertThat(result).isEmpty(); + } + + @Test + public void testFilterBySchemaIdIn() throws Exception { + PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE); + Predicate predicate = builder.in(4, Arrays.asList(0L, 1L)); + List expectedRow = getExpectedResult(2L); + List result = readWithFilter(manifestsTable, predicate); + assertThat(result).containsExactlyElementsOf(expectedRow); + } + + @Test + public void testFilterBySchemaIdInNoMatch() throws Exception { + PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE); + Predicate predicate = builder.in(4, Arrays.asList(998L, 999L)); + List result = readWithFilter(manifestsTable, predicate); + assertThat(result).isEmpty(); + } + @Test void testManifestCreationTimeTimestamp() throws Exception { Identifier identifier = identifier("T_CreationTime"); @@ -243,4 +296,19 @@ private List getExpectedResult(long snapshotId) { } return expectedRow; } + + private Predicate schemaIdEqual(long schemaId) { + PredicateBuilder builder = new PredicateBuilder(ManifestsTable.TABLE_TYPE); + return builder.equal(4, schemaId); + } + + private List readWithFilter(Table table, Predicate predicate) throws Exception { + ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = new InternalRowSerializer(table.rowType()); + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } }