diff --git a/contrib/format-paimon/README.md b/contrib/format-paimon/README.md
new file mode 100644
index 00000000000..224405dacc5
--- /dev/null
+++ b/contrib/format-paimon/README.md
@@ -0,0 +1,96 @@
+# Apache Paimon format plugin
+
+This format plugin enables Drill to query Apache Paimon tables.
+
+Unlike regular format plugins, the Paimon table is a folder with data and metadata files, but Drill checks the presence
+of the `snapshot` directory and `schema` directory to ensure that the table is a Paimon one.
+
+Drill supports reading all formats of Paimon tables currently supported via Paimon Java API: Parquet and ORC.
+No need to provide actual table format, it will be discovered automatically.
+
+For details related to Apache Paimon table format, please refer to [official docs](https://paimon.apache.org/).
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project pushdown optimization.
+
+For the case of project pushdown, only columns specified in the query will be read. In conjunction with
+column-oriented formats like Parquet or ORC, it allows improving reading performance significantly.
+
+### Filter pushdown
+
+This format plugin supports filter pushdown optimization.
+
+For the case of filter pushdown, expressions supported by Paimon API will be pushed down, so only data that matches
+the filter expression will be read.
+
+### Limit pushdown
+
+This format plugin supports limit pushdown optimization.
+
+The limit is pushed down to Paimon scan planning to reduce the amount of data read.
+
+### Querying table metadata
+
+Apache Drill provides the ability to query table metadata exposed by Paimon.
+
+At this point, Apache Paimon has the following metadata kinds:
+
+* SNAPSHOTS
+* SCHEMAS
+* FILES
+* MANIFESTS
+
+To query specific metadata, just add the `#metadata_name` suffix to the table location, like in the following example:
+
+```sql
+SELECT *
+FROM dfs.tmp.`testTable#snapshots`;
+```
+
+### Querying specific table versions (time travel)
+
+Apache Paimon has the ability to track the table modifications and read specific version before or after modifications
+or modifications itself.
+
+This format plugin embraces this ability and provides an easy-to-use way of triggering it.
+
+The following ways of specifying table version are supported:
+
+- `snapshotId` - id of the specific snapshot
+- `snapshotAsOfTime` - the most recent snapshot as of the given time in milliseconds
+
+Table function can be used to specify one of the above configs in the following way:
+
+```sql
+SELECT *
+FROM table(dfs.tmp.testTable(type => 'paimon', snapshotId => 1));
+
+SELECT *
+FROM table(dfs.tmp.testTable(type => 'paimon', snapshotAsOfTime => 1736345510000));
+```
+
+Note: `snapshotId` and `snapshotAsOfTime` are mutually exclusive and cannot be specified at the same time.
+
+## Configuration
+
+The only required configuration option is:
+
+- `type` - format plugin type, should be `'paimon'`
+
+Note: `snapshotId` and `snapshotAsOfTime` for time travel queries are specified at query time using the `table()` function.
+
+### Format config example:
+
+```json
+{
+ "type": "file",
+ "formats": {
+ "paimon": {
+ "type": "paimon"
+ }
+ }
+}
+```
diff --git a/contrib/format-paimon/pom.xml b/contrib/format-paimon/pom.xml
new file mode 100644
index 00000000000..0bc02a9da0a
--- /dev/null
+++ b/contrib/format-paimon/pom.xml
@@ -0,0 +1,123 @@
+
+
+
+ 4.0.0
+
+ drill-contrib-parent
+ org.apache.drill.contrib
+ 1.23.0-SNAPSHOT
+
+
+ drill-paimon-format
+
+ Drill : Contrib : Format : Paimon
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ ${project.version}
+
+
+
+ org.apache.paimon
+ paimon-core
+ ${paimon.version}
+
+
+ org.apache.paimon
+ paimon-common
+ ${paimon.version}
+
+
+ org.apache.paimon
+ paimon-api
+ ${paimon.version}
+
+
+ org.apache.paimon
+ paimon-codegen-loader
+ ${paimon.version}
+
+
+ org.apache.paimon
+ paimon-format
+ ${paimon.version}
+
+
+ org.apache.paimon
+ paimon-shade-jackson-2
+ 2.14.2-0.8.0
+
+
+ org.apache.paimon
+ paimon-shade-guava-30
+ 30.1.1-jre-0.8.0
+
+
+ org.apache.paimon
+ paimon-shade-caffeine-2
+ 2.9.3-0.8.0
+
+
+ org.apache.paimon
+ paimon-shade-netty-4
+ 4.1.100.Final-0.8.0
+
+
+ io.airlift
+ aircompressor
+ 0.27
+
+
+ org.lz4
+ lz4-java
+ 1.8.0
+
+
+ com.github.luben
+ zstd-jni
+ 1.5.5-11
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.1.8.4
+
+
+
+
+ org.apache.drill.exec
+ drill-java-exec
+ tests
+ ${project.version}
+ test
+
+
+ org.apache.drill
+ drill-common
+ tests
+ ${project.version}
+ test
+
+
+
+
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
new file mode 100644
index 00000000000..7d410ae5f33
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.paimon.table.source.Split;
+
+public class PaimonCompleteWork implements CompleteWork {
+ private final EndpointByteMap byteMap;
+
+ private final Split split;
+
+ private final long totalBytes;
+
+ public PaimonCompleteWork(EndpointByteMap byteMap, Split split) {
+ this.byteMap = byteMap;
+ this.split = split;
+ long rowCount = split.rowCount();
+ this.totalBytes = rowCount > 0 ? rowCount : 1;
+ }
+
+ public Split getSplit() {
+ return split;
+ }
+
+ public long getRowCount() {
+ return split.rowCount();
+ }
+
+ @Override
+ public long getTotalBytes() {
+ return totalBytes;
+ }
+
+ @Override
+ public EndpointByteMap getByteMap() {
+ return byteMap;
+ }
+
+ @Override
+ public int compareTo(CompleteWork o) {
+ return 0;
+ }
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
new file mode 100644
index 00000000000..d344f9aafe5
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonTypeName("paimon-scan")
+@SuppressWarnings("unused")
+public class PaimonGroupScan extends AbstractGroupScan {
+
+ private static final Logger logger = LoggerFactory.getLogger(PaimonGroupScan.class);
+
+ private final PaimonFormatPlugin formatPlugin;
+
+ private final String path;
+
+ private final TupleMetadata schema;
+
+ private final LogicalExpression condition;
+
+ private final List columns;
+
+ private int maxRecords;
+
+ private List chunks;
+
+ private List endpointAffinities;
+
+ private ListMultimap mappings;
+
+ @JsonCreator
+ public PaimonGroupScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JsonProperty("columns") List columns,
+ @JsonProperty("schema") TupleMetadata schema,
+ @JsonProperty("path") String path,
+ @JsonProperty("condition") LogicalExpression condition,
+ @JsonProperty("maxRecords") Integer maxRecords,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+ this(builder()
+ .userName(userName)
+ .formatPlugin(pluginRegistry.resolveFormat(storageConfig, formatConfig, PaimonFormatPlugin.class))
+ .schema(schema)
+ .path(path)
+ .condition(condition)
+ .columns(columns)
+ .maxRecords(maxRecords));
+ }
+
+ private PaimonGroupScan(PaimonGroupScanBuilder builder) throws IOException {
+ super(builder.userName);
+ this.formatPlugin = builder.formatPlugin;
+ this.columns = builder.columns;
+ this.path = builder.path;
+ this.schema = builder.schema;
+ this.condition = builder.condition;
+ this.maxRecords = builder.maxRecords;
+
+ init();
+ }
+
+ private PaimonGroupScan(PaimonGroupScan that) {
+ super(that);
+ this.columns = that.columns;
+ this.formatPlugin = that.formatPlugin;
+ this.path = that.path;
+ this.condition = that.condition;
+ this.schema = that.schema;
+ this.maxRecords = that.maxRecords;
+ this.chunks = that.chunks;
+ this.endpointAffinities = that.endpointAffinities;
+ this.mappings = that.mappings;
+ }
+
+ public static PaimonGroupScanBuilder builder() {
+ return new PaimonGroupScanBuilder();
+ }
+
+ @Override
+ public PaimonGroupScan clone(List columns) {
+ try {
+ return toBuilder().columns(columns).build();
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .message("Failed to clone Paimon group scan")
+ .build(logger);
+ }
+ }
+
+ @Override
+ public PaimonGroupScan applyLimit(int maxRecords) {
+ PaimonGroupScan clone = new PaimonGroupScan(this);
+ clone.maxRecords = maxRecords;
+ return clone;
+ }
+
+ @Override
+ public void applyAssignments(List endpoints) {
+ mappings = AssignmentCreator.getMappings(endpoints, chunks);
+ }
+
+ private void createMappings(List affinities) {
+ List endpoints = affinities.stream()
+ .map(EndpointAffinity::getEndpoint)
+ .collect(Collectors.toList());
+ applyAssignments(endpoints);
+ }
+
+ @Override
+ public PaimonSubScan getSpecificScan(int minorFragmentId) {
+ if (chunks.isEmpty()) {
+ return emptySubScan();
+ }
+
+ if (mappings == null) {
+ createMappings(endpointAffinities);
+ }
+
+ List workList = mappings.get(minorFragmentId);
+ List paimonWorkList = workList == null
+ ? Collections.emptyList()
+ : convertWorkList(workList);
+
+ PaimonSubScan subScan = PaimonSubScan.builder()
+ .userName(userName)
+ .formatPlugin(formatPlugin)
+ .columns(columns)
+ .condition(condition)
+ .schema(schema)
+ .workList(paimonWorkList)
+ .path(path)
+ .maxRecords(maxRecords)
+ .build();
+
+ subScan.setOperatorId(getOperatorId());
+ return subScan;
+ }
+
+ private PaimonSubScan emptySubScan() {
+ PaimonSubScan subScan = PaimonSubScan.builder()
+ .userName(userName)
+ .formatPlugin(formatPlugin)
+ .columns(columns)
+ .condition(condition)
+ .schema(schema)
+ .workList(Collections.emptyList())
+ .path(path)
+ .maxRecords(maxRecords)
+ .build();
+ subScan.setOperatorId(getOperatorId());
+ return subScan;
+ }
+
+ private List convertWorkList(List workList) {
+ return workList.stream()
+ .map(PaimonCompleteWork::getSplit)
+ .map(PaimonWork::new)
+ .collect(Collectors.toList());
+ }
+
+ @JsonProperty("maxRecords")
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return Math.max(chunks.size(), 1);
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ long rowCount = chunks.stream()
+ .mapToLong(PaimonCompleteWork::getRowCount)
+ .sum();
+ long estimatedRecords = rowCount > 0
+ ? rowCount
+ : Math.max(chunks.size(), 1) * 1_000_000L;
+ return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, estimatedRecords, 1, 0);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new PaimonGroupScan(this);
+ }
+
+ private void init() throws IOException {
+ Table table = PaimonTableUtils.loadTable(formatPlugin, path);
+ String fileFormat = new CoreOptions(table.options()).fileFormatString();
+ // Paimon supports multiple formats; Drill currently reads Parquet/ORC only.
+ if (!"parquet".equalsIgnoreCase(fileFormat) && !"orc".equalsIgnoreCase(fileFormat)) {
+ throw UserException.unsupportedError()
+ .message("Paimon file format '%s' is not supported. Only parquet and orc are supported.", fileFormat)
+ .build(logger);
+ }
+
+ RowType rowType = table.rowType();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ PaimonReadUtils.applyFilter(readBuilder, rowType, condition);
+ PaimonReadUtils.applyProjection(readBuilder, rowType, columns);
+ TableScan tableScan = readBuilder.newScan();
+ List splits = tableScan.plan().splits();
+ chunks = splits.stream()
+ .map(split -> new PaimonCompleteWork(new EndpointByteMapImpl(), split))
+ .collect(Collectors.toList());
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ }
+
+ @Override
+ public List getOperatorAffinity() {
+ if (endpointAffinities == null) {
+ logger.debug("Chunks size: {}", chunks.size());
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ }
+ return endpointAffinities;
+ }
+
+ @Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
+ @Override
+ @JsonProperty("columns")
+ public List getColumns() {
+ return columns;
+ }
+
+ @JsonProperty("schema")
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getStorageConfig() {
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public FormatPluginConfig getFormatConfig() {
+ return formatPlugin.getConfig();
+ }
+
+ @JsonProperty("path")
+ public String getPath() {
+ return path;
+ }
+
+ @JsonProperty("condition")
+ public LogicalExpression getCondition() {
+ return condition;
+ }
+
+ @JsonIgnore
+ public PaimonFormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ @Override
+ public String getOperatorType() {
+ return "PAIMON_GROUP_SCAN";
+ }
+
+ @Override
+ public String toString() {
+ String conditionString = condition == null ? null : ExpressionStringBuilder.toString(condition).trim();
+ return new PlanStringBuilder(this)
+ .field("path", path)
+ .field("schema", schema)
+ .field("columns", columns)
+ .field("condition", conditionString)
+ .field("maxRecords", maxRecords)
+ .toString();
+ }
+
+ public PaimonGroupScanBuilder toBuilder() {
+ return new PaimonGroupScanBuilder()
+ .userName(this.userName)
+ .formatPlugin(this.formatPlugin)
+ .schema(this.schema)
+ .path(this.path)
+ .condition(this.condition)
+ .columns(this.columns)
+ .maxRecords(this.maxRecords);
+ }
+
+ public static class PaimonGroupScanBuilder {
+ private String userName;
+
+ private PaimonFormatPlugin formatPlugin;
+
+ private TupleMetadata schema;
+
+ private String path;
+
+ private LogicalExpression condition;
+
+ private List columns;
+
+ private int maxRecords;
+
+ public PaimonGroupScanBuilder userName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder formatPlugin(PaimonFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder schema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder condition(LogicalExpression condition) {
+ this.condition = condition;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder columns(List columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ public PaimonGroupScanBuilder maxRecords(int maxRecords) {
+ this.maxRecords = maxRecords;
+ return this;
+ }
+
+ public PaimonGroupScan build() throws IOException {
+ return new PaimonGroupScan(this);
+ }
+ }
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
new file mode 100644
index 00000000000..1c696422654
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.paimon.plan.DrillExprToPaimonTranslator;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Utility class for common Paimon read operations such as applying filters
+ * and projections to ReadBuilder instances.
+ */
+public final class PaimonReadUtils {
+ private static final Logger logger = LoggerFactory.getLogger(PaimonReadUtils.class);
+
+ private PaimonReadUtils() {
+ // Utility class
+ }
+
+ /**
+ * Applies a filter expression to the Paimon ReadBuilder.
+ *
+ * @param readBuilder the Paimon ReadBuilder
+ * @param rowType the table row type
+ * @param condition the filter condition
+ */
+ public static void applyFilter(ReadBuilder readBuilder, RowType rowType, LogicalExpression condition) {
+ if (condition == null) {
+ return;
+ }
+ // Translate Drill expression into a Paimon predicate (if supported).
+ Predicate predicate = DrillExprToPaimonTranslator.translate(condition, rowType);
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+ }
+
+ /**
+ * Applies column projection to the Paimon ReadBuilder.
+ *
+ * @param readBuilder the Paimon ReadBuilder
+ * @param rowType the table row type
+ * @param columns the columns to project
+ */
+ public static void applyProjection(ReadBuilder readBuilder, RowType rowType, List columns) {
+ if (columns == null || columns.isEmpty()) {
+ return;
+ }
+
+ boolean hasStar = columns.stream().anyMatch(SchemaPath::isDynamicStar);
+ if (hasStar) {
+ return;
+ }
+
+ Set projectedNames = new HashSet<>();
+ List projection = new ArrayList<>();
+ for (SchemaPath column : columns) {
+ PathSegment segment = column.getRootSegment();
+ if (segment == null || !segment.isNamed()) {
+ continue;
+ }
+ String name = segment.getNameSegment().getPath();
+ if (!projectedNames.add(name)) {
+ continue;
+ }
+ int index = rowType.getFieldIndex(name);
+ if (index < 0) {
+ throw UserException.validationError()
+ .message("Paimon column not found: %s", name)
+ .build(logger);
+ }
+ projection.add(index);
+ }
+
+ if (!projection.isEmpty()) {
+ int[] projectionArray = projection.stream().mapToInt(Integer::intValue).toArray();
+ readBuilder.withProjection(projectionArray);
+ }
+ }
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
new file mode 100644
index 00000000000..6f94462c1c4
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import com.google.common.base.Preconditions;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+@JsonTypeName("paimon-read")
+@SuppressWarnings("unused")
+public class PaimonSubScan extends AbstractBase implements SubScan {
+
+ private static final String OPERATOR_TYPE = "PAIMON_SUB_SCAN";
+
+ private final PaimonFormatPlugin formatPlugin;
+
+ private final List columns;
+
+ private final LogicalExpression condition;
+
+ private final TupleMetadata schema;
+
+ private final List workList;
+
+ private final String path;
+
+ private final int maxRecords;
+
+ @JsonCreator
+ public PaimonSubScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JsonProperty("columns") List columns,
+ @JsonProperty("path") String path,
+ @JsonProperty("workList") List workList,
+ @JsonProperty("schema") TupleMetadata schema,
+ @JsonProperty("condition") LogicalExpression condition,
+ @JsonProperty("maxRecords") Integer maxRecords,
+ @JacksonInject StoragePluginRegistry pluginRegistry) {
+ this.formatPlugin = pluginRegistry.resolveFormat(storageConfig, formatConfig, PaimonFormatPlugin.class);
+ this.columns = columns;
+ this.workList = workList;
+ this.path = path;
+ this.condition = condition;
+ this.schema = schema;
+ this.maxRecords = maxRecords;
+ }
+
+ private PaimonSubScan(PaimonSubScanBuilder builder) {
+ super(builder.userName);
+ this.formatPlugin = builder.formatPlugin;
+ this.columns = builder.columns;
+ this.condition = builder.condition;
+ this.schema = builder.schema;
+ this.workList = builder.workList;
+ this.path = builder.path;
+ this.maxRecords = builder.maxRecords;
+ }
+
+ public static PaimonSubScanBuilder builder() {
+ return new PaimonSubScanBuilder();
+ }
+
+ @Override
+ public T accept(
+ PhysicalVisitor physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ public List getWorkList() {
+ return workList;
+ }
+
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
+ public List getColumns() {
+ return columns;
+ }
+
+ public LogicalExpression getCondition() {
+ return condition;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return this.toBuilder().build();
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getStorageConfig() {
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public FormatPluginConfig getFormatConfig() {
+ return formatPlugin.getConfig();
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public String getOperatorType() {
+ return OPERATOR_TYPE;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return Collections.emptyIterator();
+ }
+
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
+ @JsonIgnore
+ public PaimonFormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ public PaimonSubScanBuilder toBuilder() {
+ return new PaimonSubScanBuilder()
+ .userName(this.userName)
+ .formatPlugin(this.formatPlugin)
+ .columns(this.columns)
+ .condition(this.condition)
+ .schema(this.schema)
+ .workList(this.workList)
+ .path(this.path)
+ .maxRecords(this.maxRecords);
+ }
+
+ public static class PaimonSubScanBuilder {
+ private String userName;
+
+ private PaimonFormatPlugin formatPlugin;
+
+ private List columns;
+
+ private LogicalExpression condition;
+
+ private TupleMetadata schema;
+
+ private List workList;
+
+ private String path;
+
+ private int maxRecords;
+
+ public PaimonSubScanBuilder userName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public PaimonSubScanBuilder formatPlugin(PaimonFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return this;
+ }
+
+ public PaimonSubScanBuilder columns(List columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ public PaimonSubScanBuilder condition(LogicalExpression condition) {
+ this.condition = condition;
+ return this;
+ }
+
+ public PaimonSubScanBuilder schema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public PaimonSubScanBuilder workList(List workList) {
+ this.workList = workList;
+ return this;
+ }
+
+ public PaimonSubScanBuilder path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public PaimonSubScanBuilder maxRecords(int maxRecords) {
+ this.maxRecords = maxRecords;
+ return this;
+ }
+
+ public PaimonSubScan build() {
+ return new PaimonSubScan(this);
+ }
+ }
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
new file mode 100644
index 00000000000..6979799a3bd
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatLocationTransformer;
+import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig;
+import org.apache.drill.exec.store.paimon.format.PaimonMetadataType;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.system.SystemTableLoader;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class PaimonTableUtils {
+ private PaimonTableUtils() {
+ }
+
+ /**
+ * Load a Paimon table directly from a filesystem path. If a metadata suffix is present,
+ * returns the corresponding system table; otherwise returns the data table.
+ */
+ public static Table loadTable(PaimonFormatPlugin formatPlugin, String path) throws IOException {
+ PaimonMetadataType metadataType = extractMetadataType(path);
+ String tableLocation = metadataType == null ? path : stripMetadataType(path);
+ Path tablePath = new Path(tableLocation);
+ Options options = new Options();
+ CatalogContext context = CatalogContext.create(options, formatPlugin.getFsConf());
+ FileIO fileIO = FileIO.get(tablePath, context);
+ FileStoreTable table = FileStoreTableFactory.create(fileIO, tablePath);
+ // Apply time-travel and custom options at table load time.
+ Map dynamicOptions = buildDynamicOptions(formatPlugin.getConfig());
+ if (!dynamicOptions.isEmpty()) {
+ table = table.copy(dynamicOptions);
+ }
+ if (metadataType == null) {
+ return table;
+ }
+ Table metadataTable = SystemTableLoader.load(metadataType.getName(), table);
+ Preconditions.checkArgument(metadataTable != null, "Unsupported metadata table: %s", metadataType.getName());
+ return metadataTable;
+ }
+
+ private static PaimonMetadataType extractMetadataType(String location) {
+ int index = location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR);
+ if (index < 0) {
+ return null;
+ }
+ String metadataName = location.substring(index + 1);
+ return PaimonMetadataType.from(metadataName);
+ }
+
+ private static String stripMetadataType(String location) {
+ int index = location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR);
+ return index < 0 ? location : location.substring(0, index);
+ }
+
+ private static Map buildDynamicOptions(PaimonFormatPluginConfig config) {
+ Map dynamicOptions = new HashMap<>();
+ if (config == null) {
+ return dynamicOptions;
+ }
+ if (config.getProperties() != null) {
+ dynamicOptions.putAll(config.getProperties());
+ }
+
+ Long snapshotId = config.getSnapshotId();
+ Long snapshotAsOfTime = config.getSnapshotAsOfTime();
+ Preconditions.checkArgument(snapshotId == null || snapshotAsOfTime == null,
+ "Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified");
+ if (snapshotId != null) {
+ dynamicOptions.put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.FROM_SNAPSHOT.toString());
+ dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId.toString());
+ } else if (snapshotAsOfTime != null) {
+ dynamicOptions.put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.FROM_TIMESTAMP.toString());
+ dynamicOptions.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), snapshotAsOfTime.toString());
+ }
+
+ return dynamicOptions;
+ }
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
new file mode 100644
index 00000000000..ff06d60de89
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.paimon.table.source.Split;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.Objects;
+
+@JsonSerialize(using = PaimonWork.PaimonWorkSerializer.class)
+@JsonDeserialize(using = PaimonWork.PaimonWorkDeserializer.class)
+public class PaimonWork {
+ private final Split split;
+
+ public PaimonWork(Split split) {
+ this.split = split;
+ }
+
+ public Split getSplit() {
+ return split;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonWork that = (PaimonWork) o;
+ return Objects.equals(split, that.split);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(split);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("split", split)
+ .toString();
+ }
+
+ public static class PaimonWorkDeserializer extends StdDeserializer {
+
+ private static final Logger logger = LoggerFactory.getLogger(PaimonWorkDeserializer.class);
+
+ public PaimonWorkDeserializer() {
+ super(PaimonWork.class);
+ }
+
+ @Override
+ public PaimonWork deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ JsonNode node = p.getCodec().readTree(p);
+ String splitString = node.get(PaimonWorkSerializer.SPLIT_FIELD).asText();
+
+ byte[] decoded = Base64.getDecoder().decode(splitString);
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(decoded))) {
+ Object split = ois.readObject();
+ if (!(split instanceof Split)) {
+ throw UserException.dataReadError()
+ .message("Deserialized object is not a Paimon Split: %s", split.getClass().getName())
+ .build(logger);
+ }
+ return new PaimonWork((Split) split);
+ } catch (ClassNotFoundException e) {
+ logger.error("Failed to deserialize Paimon Split: {}", e.getMessage(), e);
+ throw UserException.dataReadError(e)
+ .message("Failed to deserialize Paimon Split: %s", e.getMessage())
+ .build(logger);
+ }
+ }
+ }
+
+ public static class PaimonWorkSerializer extends StdSerializer {
+
+ public static final String SPLIT_FIELD = "split";
+
+ public PaimonWorkSerializer() {
+ super(PaimonWork.class);
+ }
+
+ @Override
+ public void serialize(PaimonWork value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeStartObject();
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(value.split);
+ oos.flush();
+ gen.writeStringField(SPLIT_FIELD, Base64.getEncoder().encodeToString(baos.toByteArray()));
+ }
+ gen.writeEndObject();
+ }
+ }
+
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
new file mode 100644
index 00000000000..43b0e588546
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatLocationTransformer;
+
+import java.util.function.Function;
+
+public class PaimonFormatLocationTransformer implements FormatLocationTransformer {
+ public static final FormatLocationTransformer INSTANCE = new PaimonFormatLocationTransformer();
+
+ // Paimon metadata tables are addressed via suffix: e.g. /path/to/table#snapshots
+ public static final String METADATA_SEPARATOR = "#";
+
+ @Override
+ public boolean canTransform(String location) {
+ PaimonMetadataType metadataType = getMetadataType(location);
+ if (metadataType == null) {
+ return false;
+ }
+ return true;
+ }
+
+ private PaimonMetadataType getMetadataType(String location) {
+ String metadataType = StringUtils.substringAfterLast(location, METADATA_SEPARATOR);
+ if (StringUtils.isNotEmpty(metadataType)) {
+ return PaimonMetadataType.from(metadataType);
+ }
+ return null;
+ }
+
+ @Override
+ public FileSelection transform(String location, Function selectionFactory) {
+ PaimonMetadataType metadataType = getMetadataType(location);
+ location = StringUtils.substringBeforeLast(location, METADATA_SEPARATOR);
+ FileSelection fileSelection = selectionFactory.apply(location);
+ if (fileSelection == null) {
+ return null;
+ }
+ // Preserve metadata type while keeping the base path selection.
+ return new PaimonMetadataFileSelection(fileSelection, metadataType);
+ }
+
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
new file mode 100644
index 00000000000..bacdbe3c73a
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatLocationTransformer;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+public class PaimonFormatMatcher extends FormatMatcher {
+ private static final String SNAPSHOT_DIR_NAME = "snapshot";
+ private static final String SCHEMA_DIR_NAME = "schema";
+
+ private final PaimonFormatPlugin formatPlugin;
+
+ public PaimonFormatMatcher(PaimonFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ }
+
+ @Override
+ public boolean supportDirectoryReads() {
+ return true;
+ }
+
+ @Override
+ public DrillTable isReadable(DrillFileSystem fs, FileSelection selection, FileSystemPlugin fsPlugin,
+ String storageEngineName, SchemaConfig schemaConfig) throws IOException {
+ Path selectionRoot = selection.getSelectionRoot();
+ Path snapshotDir = new Path(selectionRoot, SNAPSHOT_DIR_NAME);
+ Path schemaDir = new Path(selectionRoot, SCHEMA_DIR_NAME);
+ if (fs.isDirectory(selectionRoot)
+ && fs.exists(snapshotDir) && fs.isDirectory(snapshotDir)
+ && fs.exists(schemaDir) && fs.isDirectory(schemaDir)) {
+ FormatSelection formatSelection = new FormatSelection(formatPlugin.getConfig(), selection);
+ return new PluginDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(),
+ formatSelection, formatPlugin.getConvention());
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isFileReadable(DrillFileSystem fs, FileStatus status) {
+ return false;
+ }
+
+ @Override
+ public FormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ @Override
+ public int priority() {
+ return HIGH_PRIORITY;
+ }
+
+ @Override
+ public FormatLocationTransformer getFormatLocationTransformer() {
+ return PaimonFormatLocationTransformer.INSTANCE;
+ }
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
new file mode 100644
index 00000000000..d778b3f88c6
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.paimon.PaimonGroupScan;
+import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PaimonFormatPlugin implements FormatPlugin {
+
+ private static final String PAIMON_CONVENTION_PREFIX = "PAIMON.";
+
+ private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+ private final FileSystemConfig storageConfig;
+
+ private final PaimonFormatPluginConfig config;
+
+ private final Configuration fsConf;
+
+ private final DrillbitContext context;
+
+ private final String name;
+
+ private final PaimonFormatMatcher matcher;
+
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+ public PaimonFormatPlugin(
+ String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ FileSystemConfig storageConfig,
+ PaimonFormatPluginConfig config) {
+ this.storageConfig = storageConfig;
+ this.config = config;
+ this.fsConf = fsConf;
+ this.context = context;
+ this.name = name;
+ this.matcher = new PaimonFormatMatcher(this);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement());
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+ Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX + name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention, PaimonPluginImplementor::new))
+ .supportsFilterPushdown(true)
+ .supportsProjectPushdown(true)
+ .supportsLimitPushdown(true)
+ .convention(convention)
+ .build();
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location, List partitionColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List columns) throws IOException {
+ return PaimonGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .path(getPath(selection))
+ .columns(columns)
+ .maxRecords(-1)
+ .build();
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List columns, MetadataProviderManager metadataProviderManager) throws IOException {
+ SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
+ TupleMetadata schema = null;
+ if (schemaProvider != null) {
+ schema = schemaProvider.read().getSchema();
+ }
+ return PaimonGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .schema(schema)
+ .path(getPath(selection))
+ .columns(columns)
+ .maxRecords(-1)
+ .build();
+ }
+
+ @Override
+ public boolean supportsStatistics() {
+ return false;
+ }
+
+ @Override
+ public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public PaimonFormatPluginConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public FileSystemConfig getStorageConfig() {
+ return storageConfig;
+ }
+
+ @Override
+ public Configuration getFsConf() {
+ return fsConf;
+ }
+
+ @Override
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public Convention getConvention() {
+ return storagePluginRulesSupplier.convention();
+ }
+
+ private String getPath(FileSelection selection) {
+ String path = selection.getSelectionRoot().toString();
+ if (selection instanceof PaimonMetadataFileSelection) {
+ PaimonMetadataFileSelection metadataFileSelection = (PaimonMetadataFileSelection) selection;
+ // Map dfs.`/path/table#snapshots` to Paimon system table.
+ path = String.format("%s%s%s", path, PaimonFormatLocationTransformer.METADATA_SEPARATOR,
+ metadataFileSelection.getMetadataType().getName());
+ }
+ return path;
+ }
+
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
new file mode 100644
index 00000000000..f30c0b4f826
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName(PaimonFormatPluginConfig.NAME)
+@JsonDeserialize(builder = PaimonFormatPluginConfig.PaimonFormatPluginConfigBuilder.class)
+public class PaimonFormatPluginConfig implements FormatPluginConfig {
+
+ public static final String NAME = "paimon";
+
+ private final Map properties;
+
+ // Time travel: load a specific snapshot id.
+ private final Long snapshotId;
+
+ // Time travel: load the latest snapshot at or before the given timestamp (millis).
+ private final Long snapshotAsOfTime;
+
+ @JsonCreator
+ public PaimonFormatPluginConfig(PaimonFormatPluginConfigBuilder builder) {
+ this.properties = builder.properties;
+ this.snapshotId = builder.snapshotId;
+ this.snapshotAsOfTime = builder.snapshotAsOfTime;
+ }
+
+ public static PaimonFormatPluginConfigBuilder builder() {
+ return new PaimonFormatPluginConfigBuilder();
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public Long getSnapshotId() {
+ return snapshotId;
+ }
+
+ public Long getSnapshotAsOfTime() {
+ return snapshotAsOfTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonFormatPluginConfig that = (PaimonFormatPluginConfig) o;
+ return Objects.equals(properties, that.properties)
+ && Objects.equals(snapshotId, that.snapshotId)
+ && Objects.equals(snapshotAsOfTime, that.snapshotAsOfTime);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties, snapshotId, snapshotAsOfTime);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("properties", properties)
+ .field("snapshotId", snapshotId)
+ .field("snapshotAsOfTime", snapshotAsOfTime)
+ .toString();
+ }
+
+ @JsonPOJOBuilder(withPrefix = "")
+ public static class PaimonFormatPluginConfigBuilder {
+ private Map properties;
+
+ private Long snapshotId;
+
+ private Long snapshotAsOfTime;
+
+ public PaimonFormatPluginConfigBuilder properties(Map properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ public PaimonFormatPluginConfigBuilder snapshotId(Long snapshotId) {
+ this.snapshotId = snapshotId;
+ return this;
+ }
+
+ public PaimonFormatPluginConfigBuilder snapshotAsOfTime(Long snapshotAsOfTime) {
+ this.snapshotAsOfTime = snapshotAsOfTime;
+ return this;
+ }
+
+ public PaimonFormatPluginConfig build() {
+ return new PaimonFormatPluginConfig(this);
+ }
+ }
+
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
new file mode 100644
index 00000000000..ed90d18d92f
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import org.apache.drill.exec.store.dfs.FileSelection;
+
+public class PaimonMetadataFileSelection extends FileSelection {
+ private final PaimonMetadataType metadataType;
+
+ protected PaimonMetadataFileSelection(FileSelection selection, PaimonMetadataType metadataType) {
+ super(selection);
+ this.metadataType = metadataType;
+ }
+
+ public PaimonMetadataType getMetadataType() {
+ return metadataType;
+ }
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
new file mode 100644
index 00000000000..0cdec7345c1
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.format;
+
+import java.util.Locale;
+
+/**
+ * Paimon system tables exposed via table path suffix, e.g. /path/table#snapshots.
+ */
+public enum PaimonMetadataType {
+ SNAPSHOTS("snapshots"),
+ SCHEMAS("schemas"),
+ FILES("files"),
+ MANIFESTS("manifests");
+
+ private final String name;
+
+ PaimonMetadataType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static PaimonMetadataType from(String value) {
+ if (value == null) {
+ return null;
+ }
+ String normalized = value.toLowerCase(Locale.ROOT);
+ for (PaimonMetadataType type : values()) {
+ if (type.name.equals(normalized)) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
new file mode 100644
index 00000000000..16942fe4c8e
--- /dev/null
+++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.paimon.plan;
+
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.NullExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Optional;
+
+public class DrillExprToPaimonTranslator
+ extends AbstractExprVisitor