diff --git a/contrib/format-paimon/README.md b/contrib/format-paimon/README.md new file mode 100644 index 00000000000..224405dacc5 --- /dev/null +++ b/contrib/format-paimon/README.md @@ -0,0 +1,96 @@ +# Apache Paimon format plugin + +This format plugin enables Drill to query Apache Paimon tables. + +Unlike regular format plugins, the Paimon table is a folder with data and metadata files, but Drill checks the presence +of the `snapshot` directory and `schema` directory to ensure that the table is a Paimon one. + +Drill supports reading all formats of Paimon tables currently supported via Paimon Java API: Parquet and ORC. +No need to provide actual table format, it will be discovered automatically. + +For details related to Apache Paimon table format, please refer to [official docs](https://paimon.apache.org/). + +## Supported optimizations and features + +### Project pushdown + +This format plugin supports project pushdown optimization. + +For the case of project pushdown, only columns specified in the query will be read. In conjunction with +column-oriented formats like Parquet or ORC, it allows improving reading performance significantly. + +### Filter pushdown + +This format plugin supports filter pushdown optimization. + +For the case of filter pushdown, expressions supported by Paimon API will be pushed down, so only data that matches +the filter expression will be read. + +### Limit pushdown + +This format plugin supports limit pushdown optimization. + +The limit is pushed down to Paimon scan planning to reduce the amount of data read. + +### Querying table metadata + +Apache Drill provides the ability to query table metadata exposed by Paimon. + +At this point, Apache Paimon has the following metadata kinds: + +* SNAPSHOTS +* SCHEMAS +* FILES +* MANIFESTS + +To query specific metadata, just add the `#metadata_name` suffix to the table location, like in the following example: + +```sql +SELECT * +FROM dfs.tmp.`testTable#snapshots`; +``` + +### Querying specific table versions (time travel) + +Apache Paimon has the ability to track the table modifications and read specific version before or after modifications +or modifications itself. + +This format plugin embraces this ability and provides an easy-to-use way of triggering it. + +The following ways of specifying table version are supported: + +- `snapshotId` - id of the specific snapshot +- `snapshotAsOfTime` - the most recent snapshot as of the given time in milliseconds + +Table function can be used to specify one of the above configs in the following way: + +```sql +SELECT * +FROM table(dfs.tmp.testTable(type => 'paimon', snapshotId => 1)); + +SELECT * +FROM table(dfs.tmp.testTable(type => 'paimon', snapshotAsOfTime => 1736345510000)); +``` + +Note: `snapshotId` and `snapshotAsOfTime` are mutually exclusive and cannot be specified at the same time. + +## Configuration + +The only required configuration option is: + +- `type` - format plugin type, should be `'paimon'` + +Note: `snapshotId` and `snapshotAsOfTime` for time travel queries are specified at query time using the `table()` function. + +### Format config example: + +```json +{ + "type": "file", + "formats": { + "paimon": { + "type": "paimon" + } + } +} +``` diff --git a/contrib/format-paimon/pom.xml b/contrib/format-paimon/pom.xml new file mode 100644 index 00000000000..0bc02a9da0a --- /dev/null +++ b/contrib/format-paimon/pom.xml @@ -0,0 +1,123 @@ + + + + 4.0.0 + + drill-contrib-parent + org.apache.drill.contrib + 1.23.0-SNAPSHOT + + + drill-paimon-format + + Drill : Contrib : Format : Paimon + + + + org.apache.drill.exec + drill-java-exec + ${project.version} + + + + org.apache.paimon + paimon-core + ${paimon.version} + + + org.apache.paimon + paimon-common + ${paimon.version} + + + org.apache.paimon + paimon-api + ${paimon.version} + + + org.apache.paimon + paimon-codegen-loader + ${paimon.version} + + + org.apache.paimon + paimon-format + ${paimon.version} + + + org.apache.paimon + paimon-shade-jackson-2 + 2.14.2-0.8.0 + + + org.apache.paimon + paimon-shade-guava-30 + 30.1.1-jre-0.8.0 + + + org.apache.paimon + paimon-shade-caffeine-2 + 2.9.3-0.8.0 + + + org.apache.paimon + paimon-shade-netty-4 + 4.1.100.Final-0.8.0 + + + io.airlift + aircompressor + 0.27 + + + org.lz4 + lz4-java + 1.8.0 + + + com.github.luben + zstd-jni + 1.5.5-11 + + + org.xerial.snappy + snappy-java + 1.1.8.4 + + + + + org.apache.drill.exec + drill-java-exec + tests + ${project.version} + test + + + org.apache.drill + drill-common + tests + ${project.version} + test + + + + diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java new file mode 100644 index 00000000000..7d410ae5f33 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonCompleteWork.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import org.apache.drill.exec.store.schedule.CompleteWork; +import org.apache.drill.exec.store.schedule.EndpointByteMap; +import org.apache.paimon.table.source.Split; + +public class PaimonCompleteWork implements CompleteWork { + private final EndpointByteMap byteMap; + + private final Split split; + + private final long totalBytes; + + public PaimonCompleteWork(EndpointByteMap byteMap, Split split) { + this.byteMap = byteMap; + this.split = split; + long rowCount = split.rowCount(); + this.totalBytes = rowCount > 0 ? rowCount : 1; + } + + public Split getSplit() { + return split; + } + + public long getRowCount() { + return split.rowCount(); + } + + @Override + public long getTotalBytes() { + return totalBytes; + } + + @Override + public EndpointByteMap getByteMap() { + return byteMap; + } + + @Override + public int compareTo(CompleteWork o) { + return 0; + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java new file mode 100644 index 00000000000..d344f9aafe5 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonGroupScan.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ExpressionStringBuilder; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin; +import org.apache.drill.exec.store.schedule.AffinityCreator; +import org.apache.drill.exec.store.schedule.AssignmentCreator; +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; +import com.google.common.base.Preconditions; +import com.google.common.collect.ListMultimap; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +@JsonTypeName("paimon-scan") +@SuppressWarnings("unused") +public class PaimonGroupScan extends AbstractGroupScan { + + private static final Logger logger = LoggerFactory.getLogger(PaimonGroupScan.class); + + private final PaimonFormatPlugin formatPlugin; + + private final String path; + + private final TupleMetadata schema; + + private final LogicalExpression condition; + + private final List columns; + + private int maxRecords; + + private List chunks; + + private List endpointAffinities; + + private ListMultimap mappings; + + @JsonCreator + public PaimonGroupScan( + @JsonProperty("userName") String userName, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JsonProperty("columns") List columns, + @JsonProperty("schema") TupleMetadata schema, + @JsonProperty("path") String path, + @JsonProperty("condition") LogicalExpression condition, + @JsonProperty("maxRecords") Integer maxRecords, + @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException { + this(builder() + .userName(userName) + .formatPlugin(pluginRegistry.resolveFormat(storageConfig, formatConfig, PaimonFormatPlugin.class)) + .schema(schema) + .path(path) + .condition(condition) + .columns(columns) + .maxRecords(maxRecords)); + } + + private PaimonGroupScan(PaimonGroupScanBuilder builder) throws IOException { + super(builder.userName); + this.formatPlugin = builder.formatPlugin; + this.columns = builder.columns; + this.path = builder.path; + this.schema = builder.schema; + this.condition = builder.condition; + this.maxRecords = builder.maxRecords; + + init(); + } + + private PaimonGroupScan(PaimonGroupScan that) { + super(that); + this.columns = that.columns; + this.formatPlugin = that.formatPlugin; + this.path = that.path; + this.condition = that.condition; + this.schema = that.schema; + this.maxRecords = that.maxRecords; + this.chunks = that.chunks; + this.endpointAffinities = that.endpointAffinities; + this.mappings = that.mappings; + } + + public static PaimonGroupScanBuilder builder() { + return new PaimonGroupScanBuilder(); + } + + @Override + public PaimonGroupScan clone(List columns) { + try { + return toBuilder().columns(columns).build(); + } catch (IOException e) { + throw UserException.dataReadError(e) + .message("Failed to clone Paimon group scan") + .build(logger); + } + } + + @Override + public PaimonGroupScan applyLimit(int maxRecords) { + PaimonGroupScan clone = new PaimonGroupScan(this); + clone.maxRecords = maxRecords; + return clone; + } + + @Override + public void applyAssignments(List endpoints) { + mappings = AssignmentCreator.getMappings(endpoints, chunks); + } + + private void createMappings(List affinities) { + List endpoints = affinities.stream() + .map(EndpointAffinity::getEndpoint) + .collect(Collectors.toList()); + applyAssignments(endpoints); + } + + @Override + public PaimonSubScan getSpecificScan(int minorFragmentId) { + if (chunks.isEmpty()) { + return emptySubScan(); + } + + if (mappings == null) { + createMappings(endpointAffinities); + } + + List workList = mappings.get(minorFragmentId); + List paimonWorkList = workList == null + ? Collections.emptyList() + : convertWorkList(workList); + + PaimonSubScan subScan = PaimonSubScan.builder() + .userName(userName) + .formatPlugin(formatPlugin) + .columns(columns) + .condition(condition) + .schema(schema) + .workList(paimonWorkList) + .path(path) + .maxRecords(maxRecords) + .build(); + + subScan.setOperatorId(getOperatorId()); + return subScan; + } + + private PaimonSubScan emptySubScan() { + PaimonSubScan subScan = PaimonSubScan.builder() + .userName(userName) + .formatPlugin(formatPlugin) + .columns(columns) + .condition(condition) + .schema(schema) + .workList(Collections.emptyList()) + .path(path) + .maxRecords(maxRecords) + .build(); + subScan.setOperatorId(getOperatorId()); + return subScan; + } + + private List convertWorkList(List workList) { + return workList.stream() + .map(PaimonCompleteWork::getSplit) + .map(PaimonWork::new) + .collect(Collectors.toList()); + } + + @JsonProperty("maxRecords") + public int getMaxRecords() { + return maxRecords; + } + + @Override + public int getMaxParallelizationWidth() { + return Math.max(chunks.size(), 1); + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public ScanStats getScanStats() { + long rowCount = chunks.stream() + .mapToLong(PaimonCompleteWork::getRowCount) + .sum(); + long estimatedRecords = rowCount > 0 + ? rowCount + : Math.max(chunks.size(), 1) * 1_000_000L; + return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, estimatedRecords, 1, 0); + } + + @Override + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + return new PaimonGroupScan(this); + } + + private void init() throws IOException { + Table table = PaimonTableUtils.loadTable(formatPlugin, path); + String fileFormat = new CoreOptions(table.options()).fileFormatString(); + // Paimon supports multiple formats; Drill currently reads Parquet/ORC only. + if (!"parquet".equalsIgnoreCase(fileFormat) && !"orc".equalsIgnoreCase(fileFormat)) { + throw UserException.unsupportedError() + .message("Paimon file format '%s' is not supported. Only parquet and orc are supported.", fileFormat) + .build(logger); + } + + RowType rowType = table.rowType(); + ReadBuilder readBuilder = table.newReadBuilder(); + PaimonReadUtils.applyFilter(readBuilder, rowType, condition); + PaimonReadUtils.applyProjection(readBuilder, rowType, columns); + TableScan tableScan = readBuilder.newScan(); + List splits = tableScan.plan().splits(); + chunks = splits.stream() + .map(split -> new PaimonCompleteWork(new EndpointByteMapImpl(), split)) + .collect(Collectors.toList()); + endpointAffinities = AffinityCreator.getAffinityMap(chunks); + } + + @Override + public List getOperatorAffinity() { + if (endpointAffinities == null) { + logger.debug("Chunks size: {}", chunks.size()); + endpointAffinities = AffinityCreator.getAffinityMap(chunks); + } + return endpointAffinities; + } + + @Override + public boolean supportsLimitPushdown() { + return true; + } + + @Override + @JsonProperty("columns") + public List getColumns() { + return columns; + } + + @JsonProperty("schema") + public TupleMetadata getSchema() { + return schema; + } + + @JsonProperty("storage") + public StoragePluginConfig getStorageConfig() { + return formatPlugin.getStorageConfig(); + } + + @JsonProperty("format") + public FormatPluginConfig getFormatConfig() { + return formatPlugin.getConfig(); + } + + @JsonProperty("path") + public String getPath() { + return path; + } + + @JsonProperty("condition") + public LogicalExpression getCondition() { + return condition; + } + + @JsonIgnore + public PaimonFormatPlugin getFormatPlugin() { + return formatPlugin; + } + + @Override + public String getOperatorType() { + return "PAIMON_GROUP_SCAN"; + } + + @Override + public String toString() { + String conditionString = condition == null ? null : ExpressionStringBuilder.toString(condition).trim(); + return new PlanStringBuilder(this) + .field("path", path) + .field("schema", schema) + .field("columns", columns) + .field("condition", conditionString) + .field("maxRecords", maxRecords) + .toString(); + } + + public PaimonGroupScanBuilder toBuilder() { + return new PaimonGroupScanBuilder() + .userName(this.userName) + .formatPlugin(this.formatPlugin) + .schema(this.schema) + .path(this.path) + .condition(this.condition) + .columns(this.columns) + .maxRecords(this.maxRecords); + } + + public static class PaimonGroupScanBuilder { + private String userName; + + private PaimonFormatPlugin formatPlugin; + + private TupleMetadata schema; + + private String path; + + private LogicalExpression condition; + + private List columns; + + private int maxRecords; + + public PaimonGroupScanBuilder userName(String userName) { + this.userName = userName; + return this; + } + + public PaimonGroupScanBuilder formatPlugin(PaimonFormatPlugin formatPlugin) { + this.formatPlugin = formatPlugin; + return this; + } + + public PaimonGroupScanBuilder schema(TupleMetadata schema) { + this.schema = schema; + return this; + } + + public PaimonGroupScanBuilder path(String path) { + this.path = path; + return this; + } + + public PaimonGroupScanBuilder condition(LogicalExpression condition) { + this.condition = condition; + return this; + } + + public PaimonGroupScanBuilder columns(List columns) { + this.columns = columns; + return this; + } + + public PaimonGroupScanBuilder maxRecords(int maxRecords) { + this.maxRecords = maxRecords; + return this; + } + + public PaimonGroupScan build() throws IOException { + return new PaimonGroupScan(this); + } + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java new file mode 100644 index 00000000000..1c696422654 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonReadUtils.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.store.paimon.plan.DrillExprToPaimonTranslator; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Utility class for common Paimon read operations such as applying filters + * and projections to ReadBuilder instances. + */ +public final class PaimonReadUtils { + private static final Logger logger = LoggerFactory.getLogger(PaimonReadUtils.class); + + private PaimonReadUtils() { + // Utility class + } + + /** + * Applies a filter expression to the Paimon ReadBuilder. + * + * @param readBuilder the Paimon ReadBuilder + * @param rowType the table row type + * @param condition the filter condition + */ + public static void applyFilter(ReadBuilder readBuilder, RowType rowType, LogicalExpression condition) { + if (condition == null) { + return; + } + // Translate Drill expression into a Paimon predicate (if supported). + Predicate predicate = DrillExprToPaimonTranslator.translate(condition, rowType); + if (predicate != null) { + readBuilder.withFilter(predicate); + } + } + + /** + * Applies column projection to the Paimon ReadBuilder. + * + * @param readBuilder the Paimon ReadBuilder + * @param rowType the table row type + * @param columns the columns to project + */ + public static void applyProjection(ReadBuilder readBuilder, RowType rowType, List columns) { + if (columns == null || columns.isEmpty()) { + return; + } + + boolean hasStar = columns.stream().anyMatch(SchemaPath::isDynamicStar); + if (hasStar) { + return; + } + + Set projectedNames = new HashSet<>(); + List projection = new ArrayList<>(); + for (SchemaPath column : columns) { + PathSegment segment = column.getRootSegment(); + if (segment == null || !segment.isNamed()) { + continue; + } + String name = segment.getNameSegment().getPath(); + if (!projectedNames.add(name)) { + continue; + } + int index = rowType.getFieldIndex(name); + if (index < 0) { + throw UserException.validationError() + .message("Paimon column not found: %s", name) + .build(logger); + } + projection.add(index); + } + + if (!projection.isEmpty()) { + int[] projectionArray = projection.stream().mapToInt(Integer::intValue).toArray(); + readBuilder.withProjection(projectionArray); + } + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java new file mode 100644 index 00000000000..6f94462c1c4 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonSubScan.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin; +import com.google.common.base.Preconditions; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +@JsonTypeName("paimon-read") +@SuppressWarnings("unused") +public class PaimonSubScan extends AbstractBase implements SubScan { + + private static final String OPERATOR_TYPE = "PAIMON_SUB_SCAN"; + + private final PaimonFormatPlugin formatPlugin; + + private final List columns; + + private final LogicalExpression condition; + + private final TupleMetadata schema; + + private final List workList; + + private final String path; + + private final int maxRecords; + + @JsonCreator + public PaimonSubScan( + @JsonProperty("userName") String userName, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JsonProperty("columns") List columns, + @JsonProperty("path") String path, + @JsonProperty("workList") List workList, + @JsonProperty("schema") TupleMetadata schema, + @JsonProperty("condition") LogicalExpression condition, + @JsonProperty("maxRecords") Integer maxRecords, + @JacksonInject StoragePluginRegistry pluginRegistry) { + this.formatPlugin = pluginRegistry.resolveFormat(storageConfig, formatConfig, PaimonFormatPlugin.class); + this.columns = columns; + this.workList = workList; + this.path = path; + this.condition = condition; + this.schema = schema; + this.maxRecords = maxRecords; + } + + private PaimonSubScan(PaimonSubScanBuilder builder) { + super(builder.userName); + this.formatPlugin = builder.formatPlugin; + this.columns = builder.columns; + this.condition = builder.condition; + this.schema = builder.schema; + this.workList = builder.workList; + this.path = builder.path; + this.maxRecords = builder.maxRecords; + } + + public static PaimonSubScanBuilder builder() { + return new PaimonSubScanBuilder(); + } + + @Override + public T accept( + PhysicalVisitor physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + public List getWorkList() { + return workList; + } + + public int getMaxRecords() { + return maxRecords; + } + + public List getColumns() { + return columns; + } + + public LogicalExpression getCondition() { + return condition; + } + + @Override + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + return this.toBuilder().build(); + } + + @JsonProperty("storage") + public StoragePluginConfig getStorageConfig() { + return formatPlugin.getStorageConfig(); + } + + @JsonProperty("format") + public FormatPluginConfig getFormatConfig() { + return formatPlugin.getConfig(); + } + + public String getPath() { + return path; + } + + @Override + public String getOperatorType() { + return OPERATOR_TYPE; + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + public TupleMetadata getSchema() { + return schema; + } + + @JsonIgnore + public PaimonFormatPlugin getFormatPlugin() { + return formatPlugin; + } + + public PaimonSubScanBuilder toBuilder() { + return new PaimonSubScanBuilder() + .userName(this.userName) + .formatPlugin(this.formatPlugin) + .columns(this.columns) + .condition(this.condition) + .schema(this.schema) + .workList(this.workList) + .path(this.path) + .maxRecords(this.maxRecords); + } + + public static class PaimonSubScanBuilder { + private String userName; + + private PaimonFormatPlugin formatPlugin; + + private List columns; + + private LogicalExpression condition; + + private TupleMetadata schema; + + private List workList; + + private String path; + + private int maxRecords; + + public PaimonSubScanBuilder userName(String userName) { + this.userName = userName; + return this; + } + + public PaimonSubScanBuilder formatPlugin(PaimonFormatPlugin formatPlugin) { + this.formatPlugin = formatPlugin; + return this; + } + + public PaimonSubScanBuilder columns(List columns) { + this.columns = columns; + return this; + } + + public PaimonSubScanBuilder condition(LogicalExpression condition) { + this.condition = condition; + return this; + } + + public PaimonSubScanBuilder schema(TupleMetadata schema) { + this.schema = schema; + return this; + } + + public PaimonSubScanBuilder workList(List workList) { + this.workList = workList; + return this; + } + + public PaimonSubScanBuilder path(String path) { + this.path = path; + return this; + } + + public PaimonSubScanBuilder maxRecords(int maxRecords) { + this.maxRecords = maxRecords; + return this; + } + + public PaimonSubScan build() { + return new PaimonSubScan(this); + } + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java new file mode 100644 index 00000000000..6979799a3bd --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonTableUtils.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import com.google.common.base.Preconditions; +import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin; +import org.apache.drill.exec.store.paimon.format.PaimonFormatLocationTransformer; +import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig; +import org.apache.drill.exec.store.paimon.format.PaimonMetadataType; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.system.SystemTableLoader; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public final class PaimonTableUtils { + private PaimonTableUtils() { + } + + /** + * Load a Paimon table directly from a filesystem path. If a metadata suffix is present, + * returns the corresponding system table; otherwise returns the data table. + */ + public static Table loadTable(PaimonFormatPlugin formatPlugin, String path) throws IOException { + PaimonMetadataType metadataType = extractMetadataType(path); + String tableLocation = metadataType == null ? path : stripMetadataType(path); + Path tablePath = new Path(tableLocation); + Options options = new Options(); + CatalogContext context = CatalogContext.create(options, formatPlugin.getFsConf()); + FileIO fileIO = FileIO.get(tablePath, context); + FileStoreTable table = FileStoreTableFactory.create(fileIO, tablePath); + // Apply time-travel and custom options at table load time. + Map dynamicOptions = buildDynamicOptions(formatPlugin.getConfig()); + if (!dynamicOptions.isEmpty()) { + table = table.copy(dynamicOptions); + } + if (metadataType == null) { + return table; + } + Table metadataTable = SystemTableLoader.load(metadataType.getName(), table); + Preconditions.checkArgument(metadataTable != null, "Unsupported metadata table: %s", metadataType.getName()); + return metadataTable; + } + + private static PaimonMetadataType extractMetadataType(String location) { + int index = location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR); + if (index < 0) { + return null; + } + String metadataName = location.substring(index + 1); + return PaimonMetadataType.from(metadataName); + } + + private static String stripMetadataType(String location) { + int index = location.lastIndexOf(PaimonFormatLocationTransformer.METADATA_SEPARATOR); + return index < 0 ? location : location.substring(0, index); + } + + private static Map buildDynamicOptions(PaimonFormatPluginConfig config) { + Map dynamicOptions = new HashMap<>(); + if (config == null) { + return dynamicOptions; + } + if (config.getProperties() != null) { + dynamicOptions.putAll(config.getProperties()); + } + + Long snapshotId = config.getSnapshotId(); + Long snapshotAsOfTime = config.getSnapshotAsOfTime(); + Preconditions.checkArgument(snapshotId == null || snapshotAsOfTime == null, + "Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified"); + if (snapshotId != null) { + dynamicOptions.put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.FROM_SNAPSHOT.toString()); + dynamicOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId.toString()); + } else if (snapshotAsOfTime != null) { + dynamicOptions.put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.FROM_TIMESTAMP.toString()); + dynamicOptions.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), snapshotAsOfTime.toString()); + } + + return dynamicOptions; + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java new file mode 100644 index 00000000000..ff06d60de89 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/PaimonWork.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.UserException; +import org.apache.paimon.table.source.Split; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Base64; +import java.util.Objects; + +@JsonSerialize(using = PaimonWork.PaimonWorkSerializer.class) +@JsonDeserialize(using = PaimonWork.PaimonWorkDeserializer.class) +public class PaimonWork { + private final Split split; + + public PaimonWork(Split split) { + this.split = split; + } + + public Split getSplit() { + return split; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonWork that = (PaimonWork) o; + return Objects.equals(split, that.split); + } + + @Override + public int hashCode() { + return Objects.hash(split); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("split", split) + .toString(); + } + + public static class PaimonWorkDeserializer extends StdDeserializer { + + private static final Logger logger = LoggerFactory.getLogger(PaimonWorkDeserializer.class); + + public PaimonWorkDeserializer() { + super(PaimonWork.class); + } + + @Override + public PaimonWork deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + String splitString = node.get(PaimonWorkSerializer.SPLIT_FIELD).asText(); + + byte[] decoded = Base64.getDecoder().decode(splitString); + try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(decoded))) { + Object split = ois.readObject(); + if (!(split instanceof Split)) { + throw UserException.dataReadError() + .message("Deserialized object is not a Paimon Split: %s", split.getClass().getName()) + .build(logger); + } + return new PaimonWork((Split) split); + } catch (ClassNotFoundException e) { + logger.error("Failed to deserialize Paimon Split: {}", e.getMessage(), e); + throw UserException.dataReadError(e) + .message("Failed to deserialize Paimon Split: %s", e.getMessage()) + .build(logger); + } + } + } + + public static class PaimonWorkSerializer extends StdSerializer { + + public static final String SPLIT_FIELD = "split"; + + public PaimonWorkSerializer() { + super(PaimonWork.class); + } + + @Override + public void serialize(PaimonWork value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeStartObject(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(value.split); + oos.flush(); + gen.writeStringField(SPLIT_FIELD, Base64.getEncoder().encodeToString(baos.toByteArray())); + } + gen.writeEndObject(); + } + } + +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java new file mode 100644 index 00000000000..43b0e588546 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatLocationTransformer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatLocationTransformer; + +import java.util.function.Function; + +public class PaimonFormatLocationTransformer implements FormatLocationTransformer { + public static final FormatLocationTransformer INSTANCE = new PaimonFormatLocationTransformer(); + + // Paimon metadata tables are addressed via suffix: e.g. /path/to/table#snapshots + public static final String METADATA_SEPARATOR = "#"; + + @Override + public boolean canTransform(String location) { + PaimonMetadataType metadataType = getMetadataType(location); + if (metadataType == null) { + return false; + } + return true; + } + + private PaimonMetadataType getMetadataType(String location) { + String metadataType = StringUtils.substringAfterLast(location, METADATA_SEPARATOR); + if (StringUtils.isNotEmpty(metadataType)) { + return PaimonMetadataType.from(metadataType); + } + return null; + } + + @Override + public FileSelection transform(String location, Function selectionFactory) { + PaimonMetadataType metadataType = getMetadataType(location); + location = StringUtils.substringBeforeLast(location, METADATA_SEPARATOR); + FileSelection fileSelection = selectionFactory.apply(location); + if (fileSelection == null) { + return null; + } + // Preserve metadata type while keeping the base path selection. + return new PaimonMetadataFileSelection(fileSelection, metadataType); + } + +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java new file mode 100644 index 00000000000..bacdbe3c73a --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatMatcher.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatLocationTransformer; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.plan.rel.PluginDrillTable; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +public class PaimonFormatMatcher extends FormatMatcher { + private static final String SNAPSHOT_DIR_NAME = "snapshot"; + private static final String SCHEMA_DIR_NAME = "schema"; + + private final PaimonFormatPlugin formatPlugin; + + public PaimonFormatMatcher(PaimonFormatPlugin formatPlugin) { + this.formatPlugin = formatPlugin; + } + + @Override + public boolean supportDirectoryReads() { + return true; + } + + @Override + public DrillTable isReadable(DrillFileSystem fs, FileSelection selection, FileSystemPlugin fsPlugin, + String storageEngineName, SchemaConfig schemaConfig) throws IOException { + Path selectionRoot = selection.getSelectionRoot(); + Path snapshotDir = new Path(selectionRoot, SNAPSHOT_DIR_NAME); + Path schemaDir = new Path(selectionRoot, SCHEMA_DIR_NAME); + if (fs.isDirectory(selectionRoot) + && fs.exists(snapshotDir) && fs.isDirectory(snapshotDir) + && fs.exists(schemaDir) && fs.isDirectory(schemaDir)) { + FormatSelection formatSelection = new FormatSelection(formatPlugin.getConfig(), selection); + return new PluginDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(), + formatSelection, formatPlugin.getConvention()); + } + return null; + } + + @Override + public boolean isFileReadable(DrillFileSystem fs, FileStatus status) { + return false; + } + + @Override + public FormatPlugin getFormatPlugin() { + return formatPlugin; + } + + @Override + public int priority() { + return HIGH_PRIORITY; + } + + @Override + public FormatLocationTransformer getFormatLocationTransformer() { + return PaimonFormatLocationTransformer.INSTANCE; + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java new file mode 100644 index 00000000000..d778b3f88c6 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPlugin.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.PlannerPhase; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.schema.SchemaProvider; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.PluginRulesProviderImpl; +import org.apache.drill.exec.store.StoragePluginRulesSupplier; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.drill.exec.store.paimon.PaimonGroupScan; +import org.apache.drill.exec.store.paimon.plan.PaimonPluginImplementor; +import org.apache.drill.exec.store.plan.rel.PluginRel; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +public class PaimonFormatPlugin implements FormatPlugin { + + private static final String PAIMON_CONVENTION_PREFIX = "PAIMON."; + + private static final AtomicInteger NEXT_ID = new AtomicInteger(0); + + private final FileSystemConfig storageConfig; + + private final PaimonFormatPluginConfig config; + + private final Configuration fsConf; + + private final DrillbitContext context; + + private final String name; + + private final PaimonFormatMatcher matcher; + + private final StoragePluginRulesSupplier storagePluginRulesSupplier; + + public PaimonFormatPlugin( + String name, + DrillbitContext context, + Configuration fsConf, + FileSystemConfig storageConfig, + PaimonFormatPluginConfig config) { + this.storageConfig = storageConfig; + this.config = config; + this.fsConf = fsConf; + this.context = context; + this.name = name; + this.matcher = new PaimonFormatMatcher(this); + this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement()); + } + + private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) { + Convention convention = new Convention.Impl(PAIMON_CONVENTION_PREFIX + name, PluginRel.class); + return StoragePluginRulesSupplier.builder() + .rulesProvider(new PluginRulesProviderImpl(convention, PaimonPluginImplementor::new)) + .supportsFilterPushdown(true) + .supportsProjectPushdown(true) + .supportsLimitPushdown(true) + .convention(convention) + .build(); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public boolean supportsAutoPartitioning() { + return false; + } + + @Override + public FormatMatcher getMatcher() { + return matcher; + } + + @Override + public AbstractWriter getWriter(PhysicalOperator child, String location, List partitionColumns) { + throw new UnsupportedOperationException(); + } + + @Override + public Set getOptimizerRules(PlannerPhase phase) { + switch (phase) { + case PHYSICAL: + case LOGICAL: + return storagePluginRulesSupplier.getOptimizerRules(); + case LOGICAL_PRUNE_AND_JOIN: + case LOGICAL_PRUNE: + case PARTITION_PRUNING: + case JOIN_PLANNING: + default: + return Collections.emptySet(); + } + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List columns) throws IOException { + return PaimonGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .path(getPath(selection)) + .columns(columns) + .maxRecords(-1) + .build(); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List columns, MetadataProviderManager metadataProviderManager) throws IOException { + SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider(); + TupleMetadata schema = null; + if (schemaProvider != null) { + schema = schemaProvider.read().getSchema(); + } + return PaimonGroupScan.builder() + .userName(userName) + .formatPlugin(this) + .schema(schema) + .path(getPath(selection)) + .columns(columns) + .maxRecords(-1) + .build(); + } + + @Override + public boolean supportsStatistics() { + return false; + } + + @Override + public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public PaimonFormatPluginConfig getConfig() { + return config; + } + + @Override + public FileSystemConfig getStorageConfig() { + return storageConfig; + } + + @Override + public Configuration getFsConf() { + return fsConf; + } + + @Override + public DrillbitContext getContext() { + return context; + } + + @Override + public String getName() { + return name; + } + + public Convention getConvention() { + return storagePluginRulesSupplier.convention(); + } + + private String getPath(FileSelection selection) { + String path = selection.getSelectionRoot().toString(); + if (selection instanceof PaimonMetadataFileSelection) { + PaimonMetadataFileSelection metadataFileSelection = (PaimonMetadataFileSelection) selection; + // Map dfs.`/path/table#snapshots` to Paimon system table. + path = String.format("%s%s%s", path, PaimonFormatLocationTransformer.METADATA_SEPARATOR, + metadataFileSelection.getMetadataType().getName()); + } + return path; + } + +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java new file mode 100644 index 00000000000..f30c0b4f826 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonFormatPluginConfig.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.logical.FormatPluginConfig; + +import java.util.Map; +import java.util.Objects; + +@JsonTypeName(PaimonFormatPluginConfig.NAME) +@JsonDeserialize(builder = PaimonFormatPluginConfig.PaimonFormatPluginConfigBuilder.class) +public class PaimonFormatPluginConfig implements FormatPluginConfig { + + public static final String NAME = "paimon"; + + private final Map properties; + + // Time travel: load a specific snapshot id. + private final Long snapshotId; + + // Time travel: load the latest snapshot at or before the given timestamp (millis). + private final Long snapshotAsOfTime; + + @JsonCreator + public PaimonFormatPluginConfig(PaimonFormatPluginConfigBuilder builder) { + this.properties = builder.properties; + this.snapshotId = builder.snapshotId; + this.snapshotAsOfTime = builder.snapshotAsOfTime; + } + + public static PaimonFormatPluginConfigBuilder builder() { + return new PaimonFormatPluginConfigBuilder(); + } + + public Map getProperties() { + return properties; + } + + public Long getSnapshotId() { + return snapshotId; + } + + public Long getSnapshotAsOfTime() { + return snapshotAsOfTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonFormatPluginConfig that = (PaimonFormatPluginConfig) o; + return Objects.equals(properties, that.properties) + && Objects.equals(snapshotId, that.snapshotId) + && Objects.equals(snapshotAsOfTime, that.snapshotAsOfTime); + } + + @Override + public int hashCode() { + return Objects.hash(properties, snapshotId, snapshotAsOfTime); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("properties", properties) + .field("snapshotId", snapshotId) + .field("snapshotAsOfTime", snapshotAsOfTime) + .toString(); + } + + @JsonPOJOBuilder(withPrefix = "") + public static class PaimonFormatPluginConfigBuilder { + private Map properties; + + private Long snapshotId; + + private Long snapshotAsOfTime; + + public PaimonFormatPluginConfigBuilder properties(Map properties) { + this.properties = properties; + return this; + } + + public PaimonFormatPluginConfigBuilder snapshotId(Long snapshotId) { + this.snapshotId = snapshotId; + return this; + } + + public PaimonFormatPluginConfigBuilder snapshotAsOfTime(Long snapshotAsOfTime) { + this.snapshotAsOfTime = snapshotAsOfTime; + return this; + } + + public PaimonFormatPluginConfig build() { + return new PaimonFormatPluginConfig(this); + } + } + +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java new file mode 100644 index 00000000000..ed90d18d92f --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataFileSelection.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import org.apache.drill.exec.store.dfs.FileSelection; + +public class PaimonMetadataFileSelection extends FileSelection { + private final PaimonMetadataType metadataType; + + protected PaimonMetadataFileSelection(FileSelection selection, PaimonMetadataType metadataType) { + super(selection); + this.metadataType = metadataType; + } + + public PaimonMetadataType getMetadataType() { + return metadataType; + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java new file mode 100644 index 00000000000..0cdec7345c1 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/format/PaimonMetadataType.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.format; + +import java.util.Locale; + +/** + * Paimon system tables exposed via table path suffix, e.g. /path/table#snapshots. + */ +public enum PaimonMetadataType { + SNAPSHOTS("snapshots"), + SCHEMAS("schemas"), + FILES("files"), + MANIFESTS("manifests"); + + private final String name; + + PaimonMetadataType(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static PaimonMetadataType from(String value) { + if (value == null) { + return null; + } + String normalized = value.toLowerCase(Locale.ROOT); + for (PaimonMetadataType type : values()) { + if (type.name.equals(normalized)) { + return type; + } + } + return null; + } + +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java new file mode 100644 index 00000000000..16942fe4c8e --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/DrillExprToPaimonTranslator.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.plan; + +import org.apache.drill.common.FunctionNames; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.NullExpression; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.BooleanOperator; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.common.expression.visitors.ExprVisitor; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Optional; + +public class DrillExprToPaimonTranslator + extends AbstractExprVisitor { + + // Translate Drill logical expressions into Paimon predicates for filter pushdown. + public static final ExprVisitor INSTANCE = + new DrillExprToPaimonTranslator(); + + public static Predicate translate(LogicalExpression expression, RowType rowType) { + if (expression == null || rowType == null) { + return null; + } + Object value = expression.accept(INSTANCE, new Context(rowType)); + return value instanceof Predicate ? (Predicate) value : null; + } + + @Override + public Object visitBooleanOperator(BooleanOperator op, Context context) { + if (!FunctionNames.AND.equals(op.getName()) && !FunctionNames.OR.equals(op.getName())) { + return null; + } + Predicate result = null; + for (LogicalExpression arg : op.args()) { + Predicate predicate = asPredicate(arg.accept(this, context)); + if (predicate == null) { + return null; + } + if (result == null) { + result = predicate; + } else { + result = FunctionNames.AND.equals(op.getName()) + ? PredicateBuilder.and(result, predicate) + : PredicateBuilder.or(result, predicate); + } + } + return result; + } + + @Override + public Object visitFunctionCall(FunctionCall call, Context context) { + switch (call.getName()) { + case FunctionNames.AND: { + Predicate left = asPredicate(call.arg(0).accept(this, context)); + Predicate right = asPredicate(call.arg(1).accept(this, context)); + if (left != null && right != null) { + return PredicateBuilder.and(left, right); + } + return null; + } + case FunctionNames.OR: { + Predicate left = asPredicate(call.arg(0).accept(this, context)); + Predicate right = asPredicate(call.arg(1).accept(this, context)); + if (left != null && right != null) { + return PredicateBuilder.or(left, right); + } + return null; + } + case FunctionNames.NOT: { + Predicate predicate = asPredicate(call.arg(0).accept(this, context)); + if (predicate == null) { + return null; + } + Optional negated = predicate.negate(); + return negated.orElse(null); + } + case FunctionNames.IS_NULL: { + Object arg = call.arg(0).accept(this, context); + if (arg instanceof SchemaPath) { + return buildIsNullPredicate(context, (SchemaPath) arg, true); + } + return null; + } + case FunctionNames.IS_NOT_NULL: { + Object arg = call.arg(0).accept(this, context); + if (arg instanceof SchemaPath) { + return buildIsNullPredicate(context, (SchemaPath) arg, false); + } + return null; + } + case FunctionNames.LT: + return buildComparisonPredicate(context, call.arg(0), call.arg(1), Comparison.LT); + case FunctionNames.LE: + return buildComparisonPredicate(context, call.arg(0), call.arg(1), Comparison.LE); + case FunctionNames.GT: + return buildComparisonPredicate(context, call.arg(0), call.arg(1), Comparison.GT); + case FunctionNames.GE: + return buildComparisonPredicate(context, call.arg(0), call.arg(1), Comparison.GE); + case FunctionNames.EQ: + return buildComparisonPredicate(context, call.arg(0), call.arg(1), Comparison.EQ); + case FunctionNames.NE: + return buildComparisonPredicate(context, call.arg(0), call.arg(1), Comparison.NE); + default: + return null; + } + } + + @Override + public Object visitSchemaPath(SchemaPath path, Context context) { + return path; + } + + @Override + public Object visitBooleanConstant(ValueExpressions.BooleanExpression e, Context context) { + return new LiteralValue(e.getBoolean()); + } + + @Override + public Object visitFloatConstant(ValueExpressions.FloatExpression fExpr, Context context) { + return new LiteralValue(fExpr.getFloat()); + } + + @Override + public Object visitIntConstant(ValueExpressions.IntExpression intExpr, Context context) { + return new LiteralValue(intExpr.getInt()); + } + + @Override + public Object visitLongConstant(ValueExpressions.LongExpression longExpr, Context context) { + return new LiteralValue(longExpr.getLong()); + } + + @Override + public Object visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Context context) { + return new LiteralValue(dExpr.getDouble()); + } + + @Override + public Object visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Context context) { + return new LiteralValue(BigDecimal.valueOf(decExpr.getIntFromDecimal(), decExpr.getScale())); + } + + @Override + public Object visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Context context) { + return new LiteralValue(BigDecimal.valueOf(decExpr.getLongFromDecimal(), decExpr.getScale())); + } + + @Override + public Object visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Context context) { + return new LiteralValue(decExpr.getBigDecimal()); + } + + @Override + public Object visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Context context) { + return new LiteralValue(decExpr.getBigDecimal()); + } + + @Override + public Object visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Context context) { + return new LiteralValue(decExpr.getBigDecimal()); + } + + @Override + public Object visitDateConstant(ValueExpressions.DateExpression dateExpr, Context context) { + return new LiteralValue(new Date(dateExpr.getDate())); + } + + @Override + public Object visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Context context) { + return new LiteralValue(new Time(timeExpr.getTime())); + } + + @Override + public Object visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Context context) { + return new LiteralValue(new Timestamp(timestampExpr.getTimeStamp())); + } + + @Override + public Object visitQuotedStringConstant(ValueExpressions.QuotedString e, Context context) { + return new LiteralValue(e.getString()); + } + + @Override + public Object visitNullExpression(NullExpression e, Context context) { + return new LiteralValue(null); + } + + @Override + public Object visitUnknown(LogicalExpression e, Context context) { + return null; + } + + private Predicate buildComparisonPredicate(Context context, LogicalExpression leftExpr, + LogicalExpression rightExpr, Comparison comparison) { + Object left = leftExpr.accept(this, context); + Object right = rightExpr.accept(this, context); + return buildComparisonPredicate(context, left, right, comparison); + } + + private Predicate buildComparisonPredicate(Context context, Object left, Object right, Comparison comparison) { + if (left instanceof SchemaPath && right instanceof LiteralValue) { + return buildPredicate(context, (SchemaPath) left, comparison, (LiteralValue) right); + } + if (right instanceof SchemaPath && left instanceof LiteralValue) { + return buildPredicate(context, (SchemaPath) right, comparison.flip(), (LiteralValue) left); + } + return null; + } + + private Predicate buildPredicate(Context context, SchemaPath path, Comparison comparison, LiteralValue literalValue) { + int index = columnIndex(context, path); + if (index < 0) { + return null; + } + Object value = literalValue.value(); + if (value == null) { + return null; + } + DataField field = context.rowType.getFields().get(index); + Object internalValue; + try { + internalValue = PredicateBuilder.convertJavaObject(field.type(), value); + } catch (RuntimeException e) { + return null; + } + switch (comparison) { + case EQ: + return context.predicateBuilder.equal(index, internalValue); + case NE: + return context.predicateBuilder.notEqual(index, internalValue); + case LT: + return context.predicateBuilder.lessThan(index, internalValue); + case LE: + return context.predicateBuilder.lessOrEqual(index, internalValue); + case GT: + return context.predicateBuilder.greaterThan(index, internalValue); + case GE: + return context.predicateBuilder.greaterOrEqual(index, internalValue); + default: + return null; + } + } + + private Predicate buildIsNullPredicate(Context context, SchemaPath path, boolean isNull) { + int index = columnIndex(context, path); + if (index < 0) { + return null; + } + return isNull + ? context.predicateBuilder.isNull(index) + : context.predicateBuilder.isNotNull(index); + } + + private int columnIndex(Context context, SchemaPath path) { + PathSegment segment = path.getRootSegment(); + if (segment == null || !segment.isNamed() || segment.getChild() != null) { + return -1; + } + return context.predicateBuilder.indexOf(segment.getNameSegment().getPath()); + } + + private Predicate asPredicate(Object value) { + return value instanceof Predicate ? (Predicate) value : null; + } + + private enum Comparison { + EQ, + NE, + LT, + LE, + GT, + GE; + + public Comparison flip() { + switch (this) { + case LT: + return GT; + case LE: + return GE; + case GT: + return LT; + case GE: + return LE; + case EQ: + case NE: + default: + return this; + } + } + } + + public static class Context { + private final RowType rowType; + private final PredicateBuilder predicateBuilder; + + public Context(RowType rowType) { + this.rowType = rowType; + this.predicateBuilder = new PredicateBuilder(rowType); + } + } + + private static class LiteralValue { + private final Object value; + + private LiteralValue(Object value) { + this.value = value; + } + + private Object value() { + return value; + } + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java new file mode 100644 index 00000000000..e1af514428a --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/plan/PaimonPluginImplementor.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.plan; + +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttleImpl; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.common.DrillLimitRelBase; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.store.StoragePlugin; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.paimon.PaimonGroupScan; +import org.apache.drill.exec.store.paimon.PaimonTableUtils; +import org.apache.drill.exec.store.plan.AbstractPluginImplementor; +import org.apache.drill.exec.store.plan.rel.PluginFilterRel; +import org.apache.drill.exec.store.plan.rel.PluginLimitRel; +import org.apache.drill.exec.store.plan.rel.PluginProjectRel; +import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.RowType; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import java.util.stream.Collectors; +public class PaimonPluginImplementor extends AbstractPluginImplementor { + + private PaimonGroupScan groupScan; + + @Override + public void implement(StoragePluginTableScan scan) { + groupScan = (PaimonGroupScan) scan.getGroupScan(); + } + + @Override + public void implement(PluginFilterRel filter) throws IOException { + visitChild(filter.getInput()); + + RexNode condition = filter.getCondition(); + LogicalExpression expression = DrillOptiq.toDrill( + new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())), + filter.getInput(), + condition); + groupScan = groupScan.toBuilder().condition(expression).build(); + } + + @Override + public void implement(PluginProjectRel project) throws IOException { + visitChild(project.getInput()); + + DrillParseContext context = new DrillParseContext(PrelUtil.getPlannerSettings(project.getCluster().getPlanner())); + RelNode input = project.getInput(); + + List projects = project.getProjects().stream() + .map(e -> (SchemaPath) DrillOptiq.toDrill(context, input, e)) + .collect(Collectors.toList()); + groupScan = groupScan.clone(projects); + } + + @Override + public void implement(PluginLimitRel limit) throws IOException { + visitChild(limit.getInput()); + int maxRecords = getArtificialLimit(limit); + if (maxRecords >= 0) { + groupScan = groupScan.applyLimit(maxRecords); + } + } + + @Override + public boolean canImplement(Filter filter) { + RexNode condition = filter.getCondition(); + LogicalExpression logicalExpression = DrillOptiq.toDrill( + new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())), + filter.getInput(), + condition); + GroupScan scan = findGroupScan(filter); + if (!(scan instanceof PaimonGroupScan)) { + return false; + } + PaimonGroupScan paimonGroupScan = (PaimonGroupScan) scan; + try { + Table table = PaimonTableUtils.loadTable(paimonGroupScan.getFormatPlugin(), paimonGroupScan.getPath()); + RowType rowType = table.rowType(); + return DrillExprToPaimonTranslator.translate(logicalExpression, rowType) != null; + } catch (IOException e) { + return false; + } + } + + @Override + public boolean canImplement(DrillLimitRelBase limit) { + if (hasPluginGroupScan(limit)) { + FirstLimitFinder finder = new FirstLimitFinder(); + limit.getInput().accept(finder); + int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset()); + int newLimit = getArtificialLimit(limit); + return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit); + } + return false; + } + + @Override + public boolean artificialLimit() { + return true; + } + + @Override + public boolean canImplement(Project project) { + return hasPluginGroupScan(project); + } + + @Override + public boolean splitProject(Project project) { + return true; + } + + @Override + protected boolean hasPluginGroupScan(RelNode node) { + return findGroupScan(node) instanceof PaimonGroupScan; + } + + @Override + public GroupScan getPhysicalOperator() { + return groupScan; + } + + @Override + protected Class supportedPlugin() { + return FileSystemPlugin.class; + } + + private int rexLiteralIntValue(RexLiteral offset) { + return ((BigDecimal) offset.getValue()).intValue(); + } + + private int getArtificialLimit(DrillLimitRelBase limit) { + return getArtificialLimit(limit.getFetch(), limit.getOffset()); + } + + private int getArtificialLimit(RexNode fetch, RexNode offset) { + int maxRows = -1; + if (fetch != null) { + maxRows = rexLiteralIntValue((RexLiteral) fetch); + if (offset != null) { + maxRows += rexLiteralIntValue((RexLiteral) offset); + } + } + return maxRows; + } + + private static class FirstLimitFinder extends RelShuttleImpl { + private RexNode fetch; + + private RexNode offset; + + @Override + public RelNode visit(RelNode other) { + if (other instanceof DrillLimitRelBase) { + DrillLimitRelBase limitRelBase = (DrillLimitRelBase) other; + fetch = limitRelBase.getFetch(); + offset = limitRelBase.getOffset(); + return other; + } else if (other instanceof RelSubset) { + RelSubset relSubset = (RelSubset) other; + Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this); + } + return super.visit(other); + } + + public RexNode getFetch() { + return fetch; + } + + public RexNode getOffset() { + return offset; + } + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java new file mode 100644 index 00000000000..1b411e3ef95 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonColumnConverterFactory.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.read; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.record.ColumnConverter; +import org.apache.drill.exec.record.ColumnConverterFactory; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.vector.accessor.ValueWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.LocalZoneTimestamp; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.RowType; + +import java.math.BigDecimal; +import java.time.Instant; + +public class PaimonColumnConverterFactory extends ColumnConverterFactory { + + public PaimonColumnConverterFactory(TupleMetadata providedSchema) { + super(providedSchema); + } + + @Override + public ColumnConverter.ScalarColumnConverter buildScalar(ColumnMetadata readerSchema, ValueWriter writer) { + switch (readerSchema.type()) { + case TIMESTAMP: + case TIMESTAMPTZ: + return new ColumnConverter.ScalarColumnConverter(value -> writer.setTimestamp(asInstant(value))); + case VARDECIMAL: + return new ColumnConverter.ScalarColumnConverter(value -> writer.setDecimal(asBigDecimal(value))); + case VARCHAR: + return new ColumnConverter.ScalarColumnConverter(value -> writer.setString(asString(value))); + case VARBINARY: + return new ColumnConverter.ScalarColumnConverter(value -> { + byte[] bytes = (byte[]) value; + writer.setBytes(bytes, bytes.length); + }); + default: + return super.buildScalar(readerSchema, writer); + } + } + + public static ColumnMetadata getColumnMetadata(DataField field) { + DataType type = field.type(); + String name = field.name(); + TypeProtos.DataMode dataMode = type.isNullable() + ? TypeProtos.DataMode.OPTIONAL + : TypeProtos.DataMode.REQUIRED; + return getColumnMetadata(name, type, dataMode); + } + + public static TupleSchema convertSchema(RowType rowType) { + TupleSchema schema = new TupleSchema(); + for (DataField field : rowType.getFields()) { + ColumnMetadata columnMetadata = getColumnMetadata(field); + schema.add(columnMetadata); + } + return schema; + } + + private static ColumnMetadata getColumnMetadata(String name, DataType type, TypeProtos.DataMode dataMode) { + DataTypeRoot typeRoot = type.getTypeRoot(); + switch (typeRoot) { + case ARRAY: + case MAP: + case MULTISET: + case ROW: + case VARIANT: + throw new UnsupportedOperationException(String.format("Unsupported type: %s for column: %s", typeRoot, name)); + default: + return getPrimitiveMetadata(name, type, dataMode); + } + } + + private static ColumnMetadata getPrimitiveMetadata(String name, DataType type, TypeProtos.DataMode dataMode) { + TypeProtos.MinorType minorType = getType(type.getTypeRoot()); + if (minorType == null) { + throw new UnsupportedOperationException(String.format("Unsupported type: %s for column: %s", type.getTypeRoot(), name)); + } + TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder() + .setMinorType(minorType) + .setMode(dataMode); + if (type.getTypeRoot() == DataTypeRoot.DECIMAL) { + DecimalType decimalType = (DecimalType) type; + builder.setScale(decimalType.getScale()) + .setPrecision(decimalType.getPrecision()); + } + MaterializedField materializedField = MaterializedField.create(name, builder.build()); + return MetadataUtils.fromField(materializedField); + } + + private static TypeProtos.MinorType getType(DataTypeRoot typeRoot) { + switch (typeRoot) { + case BOOLEAN: + return TypeProtos.MinorType.BIT; + case TINYINT: + return TypeProtos.MinorType.TINYINT; + case SMALLINT: + return TypeProtos.MinorType.SMALLINT; + case INTEGER: + return TypeProtos.MinorType.INT; + case BIGINT: + return TypeProtos.MinorType.BIGINT; + case FLOAT: + return TypeProtos.MinorType.FLOAT4; + case DOUBLE: + return TypeProtos.MinorType.FLOAT8; + case DATE: + return TypeProtos.MinorType.DATE; + case TIME_WITHOUT_TIME_ZONE: + return TypeProtos.MinorType.TIME; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TypeProtos.MinorType.TIMESTAMP; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return TypeProtos.MinorType.TIMESTAMPTZ; + case CHAR: + case VARCHAR: + return TypeProtos.MinorType.VARCHAR; + case BINARY: + case VARBINARY: + return TypeProtos.MinorType.VARBINARY; + case DECIMAL: + return TypeProtos.MinorType.VARDECIMAL; + default: + return null; + } + } + + private static String asString(Object value) { + if (value instanceof BinaryString) { + return value.toString(); + } + return (String) value; + } + + private static BigDecimal asBigDecimal(Object value) { + if (value instanceof Decimal) { + return ((Decimal) value).toBigDecimal(); + } + return (BigDecimal) value; + } + + private static Instant asInstant(Object value) { + if (value instanceof Timestamp) { + return ((Timestamp) value).toInstant(); + } + if (value instanceof LocalZoneTimestamp) { + return ((LocalZoneTimestamp) value).toInstant(); + } + if (value instanceof java.sql.Timestamp) { + return ((java.sql.Timestamp) value).toInstant(); + } + if (value instanceof Long) { + return Instant.ofEpochMilli((Long) value); + } + return (Instant) value; + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java new file mode 100644 index 00000000000..312838556dd --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonRecordReader.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.read; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.record.ColumnConverter; +import org.apache.drill.exec.record.ColumnConverterFactory; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.exec.store.paimon.PaimonTableUtils; +import org.apache.drill.exec.store.paimon.PaimonWork; +import org.apache.drill.exec.store.paimon.PaimonReadUtils; +import org.apache.drill.exec.store.paimon.format.PaimonFormatPlugin; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class PaimonRecordReader implements ManagedReader { + private static final Logger logger = LoggerFactory.getLogger(PaimonRecordReader.class); + + private final PaimonFormatPlugin formatPlugin; + + private final String path; + + private final List columns; + + private final LogicalExpression condition; + + private final PaimonWork work; + + private final int maxRecords; + + private ResultSetLoader loader; + + private ColumnConverter[] converters; + + private InternalRow.FieldGetter[] getters; + + private RecordReader recordReader; + + private RecordReader.RecordIterator currentBatch; + + private OperatorStats stats; + + private int lastSchemaVersion = -1; + + public PaimonRecordReader(PaimonFormatPlugin formatPlugin, String path, + List columns, LogicalExpression condition, PaimonWork work, int maxRecords, + SchemaNegotiator negotiator) { + this.formatPlugin = formatPlugin; + this.path = path; + this.columns = columns; + this.condition = condition; + this.work = work; + this.maxRecords = maxRecords; + try { + Table table = PaimonTableUtils.loadTable(formatPlugin, path); + RowType rowType = table.rowType(); + ReadBuilder readBuilder = table.newReadBuilder(); + PaimonReadUtils.applyFilter(readBuilder, rowType, condition); + PaimonReadUtils.applyProjection(readBuilder, rowType, columns); + RowType readType = readBuilder.readType(); + + TupleSchema tableSchema = PaimonColumnConverterFactory.convertSchema(readType); + TupleMetadata providedSchema = negotiator.providedSchema(); + TupleMetadata tupleSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, tableSchema); + negotiator.tableSchema(tupleSchema, true); + loader = negotiator.build(); + + converters = buildConverters(providedSchema, tableSchema); + getters = buildGetters(readType); + + recordReader = readBuilder.newRead().executeFilter().createReader(work.getSplit()); + currentBatch = null; + stats = negotiator.context().getStats(); + } catch (IOException e) { + throw UserException.dataReadError(e) + .message("Failed to open Paimon reader for %s", path) + .build(logger); + } + } + + @Override + public boolean next() { + RowSetLoader rowWriter = loader.writer(); + while (!rowWriter.isFull()) { + if (!nextLine(rowWriter)) { + updateStats(rowWriter); + return false; + } + } + updateStats(rowWriter); + return true; + } + + @Override + public void close() { + if (currentBatch != null) { + currentBatch.releaseBatch(); + } + AutoCloseables.closeSilently(recordReader); + if (loader != null) { + loader.close(); + } + } + + private boolean nextLine(RowSetLoader rowWriter) { + if (rowWriter.limitReached(maxRecords)) { + return false; + } + + InternalRow row = nextRow(); + if (row == null) { + return false; + } + + rowWriter.start(); + for (int i = 0; i < getters.length; i++) { + converters[i].convert(getters[i].getFieldOrNull(row)); + } + rowWriter.save(); + + return true; + } + + private InternalRow nextRow() { + try { + while (true) { + if (currentBatch == null) { + currentBatch = recordReader.readBatch(); + if (currentBatch == null) { + return null; + } + } + InternalRow row = currentBatch.next(); + if (row != null) { + return row; + } + currentBatch.releaseBatch(); + currentBatch = null; + } + } catch (IOException e) { + throw UserException.dataReadError(e) + .message("Failed to read Paimon data for %s", path) + .build(logger); + } + } + + private void updateStats(RowSetLoader rowWriter) { + if (stats == null) { + return; + } + int rowCount = rowWriter.rowCount(); + if (rowCount == 0) { + return; + } + int schemaVersion = loader.schemaVersion(); + boolean isNewSchema = schemaVersion != lastSchemaVersion; + lastSchemaVersion = schemaVersion; + stats.batchReceived(0, rowCount, isNewSchema); + } + + private ColumnConverter[] buildConverters(TupleMetadata providedSchema, TupleSchema tableSchema) { + ColumnConverterFactory factory = new PaimonColumnConverterFactory(providedSchema); + ColumnConverter[] columnConverters = new ColumnConverter[tableSchema.size()]; + for (int i = 0; i < tableSchema.size(); i++) { + ColumnMetadata columnMetadata = tableSchema.metadata(i); + columnConverters[i] = factory.getConverter(providedSchema, columnMetadata, + loader.writer().column(columnMetadata.name())); + } + return columnConverters; + } + + private InternalRow.FieldGetter[] buildGetters(RowType readType) { + List fields = readType.getFields(); + InternalRow.FieldGetter[] fieldGetters = new InternalRow.FieldGetter[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + fieldGetters[i] = InternalRow.createFieldGetter(fields.get(i).type(), i); + } + return fieldGetters; + } +} diff --git a/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java new file mode 100644 index 00000000000..ee3e2f5af69 --- /dev/null +++ b/contrib/format-paimon/src/main/java/org/apache/drill/exec/store/paimon/read/PaimonScanBatchCreator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon.read; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory; +import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder; +import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.paimon.PaimonSubScan; +import org.apache.drill.exec.store.paimon.PaimonWork; +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.List; + +@SuppressWarnings("unused") +public class PaimonScanBatchCreator implements BatchCreator { + + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, + PaimonSubScan subScan, List children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + + try { + ScanLifecycleBuilder builder = createBuilder(subScan); + return builder.buildScanOperator(context, subScan); + } catch (UserException e) { + throw e; + } catch (Throwable e) { + throw new ExecutionSetupException(e); + } + } + + private ScanLifecycleBuilder createBuilder(PaimonSubScan subScan) { + ScanLifecycleBuilder builder = new ScanLifecycleBuilder(); + builder.projection(subScan.getColumns()); + builder.userName(subScan.getUserName()); + builder.providedSchema(subScan.getSchema()); + builder.readerFactory(new PaimonReaderFactory(subScan)); + builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR)); + return builder; + } + + private static class PaimonReaderFactory implements ReaderFactory { + private final PaimonSubScan subScan; + private final Iterator workIterator; + + private PaimonReaderFactory(PaimonSubScan subScan) { + this.subScan = subScan; + this.workIterator = subScan.getWorkList().iterator(); + } + + @Override + public boolean hasNext() { + return workIterator.hasNext(); + } + + @Override + public ManagedReader next(SchemaNegotiator negotiator) { + return new PaimonRecordReader(subScan.getFormatPlugin(), subScan.getPath(), + subScan.getColumns(), subScan.getCondition(), workIterator.next(), subScan.getMaxRecords(), + negotiator); + } + } +} diff --git a/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json b/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json new file mode 100644 index 00000000000..786d5fd7475 --- /dev/null +++ b/contrib/format-paimon/src/main/resources/bootstrap-format-plugins.json @@ -0,0 +1,20 @@ +{ + "storage":{ + "dfs": { + "type": "file", + "formats": { + "paimon": { + "type": "paimon" + } + } + }, + "s3": { + "type": "file", + "formats": { + "paimon": { + "type": "paimon" + } + } + } + } +} diff --git a/contrib/format-paimon/src/main/resources/drill-module.conf b/contrib/format-paimon/src/main/resources/drill-module.conf new file mode 100644 index 00000000000..44156959a37 --- /dev/null +++ b/contrib/format-paimon/src/main/resources/drill-module.conf @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file tells Drill to consider this module when class path scanning. +# This file can also include any supplementary configuration information. +# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill.classpath.scanning: { + packages += "org.apache.drill.exec.store.paimon" +} diff --git a/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java b/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java new file mode 100644 index 00000000000..1a500ca4f2b --- /dev/null +++ b/contrib/format-paimon/src/test/java/org/apache/drill/exec/store/paimon/PaimonQueriesTest.java @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.paimon; + +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.security.PlainCredentialsProvider; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.physical.rowSet.RowSetBuilder; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.paimon.format.PaimonFormatPluginConfig; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.hadoop.conf.Configuration; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.DataTypes; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringContains.containsString; +import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class PaimonQueriesTest extends ClusterTest { + + private static final String DB_NAME = "default"; + private static final String TABLE_NAME = "append_table"; + private static final String PK_TABLE_NAME = "pk_table"; + private static final String PAIMON_SCAN_PATTERN = "(PAIMON_GROUP_SCAN|PaimonGroupScan)"; + private static String tableRelativePath; + private static String pkTableRelativePath; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + + StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); + FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig(); + Map formats = new HashMap<>(pluginConfig.getFormats()); + formats.put("paimon", PaimonFormatPluginConfig.builder().build()); + FileSystemConfig newPluginConfig = new FileSystemConfig( + pluginConfig.getConnection(), + pluginConfig.getConfig(), + pluginConfig.getWorkspaces(), + formats, + PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER); + newPluginConfig.setEnabled(pluginConfig.isEnabled()); + pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig); + + tableRelativePath = createAppendTable(); + pkTableRelativePath = createPrimaryKeyTable(); + } + + @Test + public void testReadAppendTable() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s`", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testReadPrimaryKeyTable() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s`", pkTableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "dave") + .addRow(2, "erin") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testProjectionPushdown() throws Exception { + String query = String.format("select name from dfs.tmp.`%s`", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*columns=\\[.*name.*\\]") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow("alice") + .addRow("bob") + .addRow("carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testMultiColumnProjection() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s`", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdown() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id = 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownGT() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id > 1", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownLT() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id < 3", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*3") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownGE() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id >= 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownLE() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id <= 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownNE() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id <> 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownAnd() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id > 1 and id < 3", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanAnd") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testFilterPushdownOr() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id = 1 or id = 3", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*booleanOr") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testFilterPushdownNot() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where not (id = 2)", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*2") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testLimitPushdown() throws Exception { + String query = String.format("select id from dfs.tmp.`%s` limit 2", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*maxRecords=2") + .match(true); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(2, results.rowCount()); + results.clear(); + } + + @Test + public void testCombinedPushdownFilterProjectionLimit() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` where id > 1 limit 1", tableRelativePath); + + queryBuilder() + .sql(query) + .planMatcher() + .include(PAIMON_SCAN_PATTERN + ".*condition=.*id.*1") + .include(PAIMON_SCAN_PATTERN + ".*maxRecords=1") + .match(true); + + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(2, "bob") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSelectWildcard() throws Exception { + String query = String.format("select * from dfs.tmp.`%s`", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(1, "alice") + .addRow(2, "bob") + .addRow(3, "carol") + .build(); + new RowSetComparison(expected).unorderedVerifyAndClearAll(results); + } + + @Test + public void testSelectWithOrderBy() throws Exception { + String query = String.format("select id, name from dfs.tmp.`%s` order by id desc", tableRelativePath); + RowSet results = queryBuilder().sql(query).rowSet(); + TupleMetadata actualSchema = results.schema(); + assertEquals(TypeProtos.MinorType.INT, actualSchema.metadata("id").type()); + assertEquals(TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").type()); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("id", TypeProtos.MinorType.INT, actualSchema.metadata("id").mode()) + .add("name", TypeProtos.MinorType.VARCHAR, actualSchema.metadata("name").mode()) + .buildSchema(); + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .addRow(3, "carol") + .addRow(2, "bob") + .addRow(1, "alice") + .build(); + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSelectWithCount() throws Exception { + String query = String.format("select count(*) from dfs.tmp.`%s`", tableRelativePath); + + assertEquals(3, queryBuilder().sql(query).singletonLong()); + } + + @Test + public void testSerDe() throws Exception { + String snapshotQuery = String.format( + "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time limit 1", tableRelativePath); + + long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong(); + String sql = String.format( + "select count(*) as cnt from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d))", + tableRelativePath, snapshotId); + String plan = queryBuilder().sql(sql).explainJson(); + long count = queryBuilder().physical(plan).singletonLong(); + assertEquals(2, count); + } + + @Test + public void testInvalidColumnName() throws Exception { + String query = String.format("select id, invalid_column from dfs.tmp.`%s`", tableRelativePath); + try { + queryBuilder().sql(query).run(); + fail("Expected UserRemoteException for invalid column name"); + } catch (UserRemoteException e) { + assertThat(e.getVerboseMessage(), containsString("invalid_column")); + } + } + + @Test + public void testSelectWithSnapshotId() throws Exception { + String snapshotQuery = String.format( + "select snapshot_id from dfs.tmp.`%s#snapshots` order by commit_time limit 1", tableRelativePath); + + long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong(); + String query = String.format( + "select id, name from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d))", + tableRelativePath, snapshotId); + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectWithSnapshotAsOfTime() throws Exception { + String snapshotQuery = String.format( + "select commit_time from dfs.tmp.`%s#snapshots` order by commit_time limit 1", tableRelativePath); + + long snapshotTime = queryBuilder().sql(snapshotQuery).singletonLong(); + String query = String.format( + "select id, name from table(dfs.tmp.`%s`(type => 'paimon', snapshotAsOfTime => %d))", + tableRelativePath, snapshotTime); + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectWithSnapshotIdAndSnapshotAsOfTime() throws Exception { + String query = String.format( + "select * from table(dfs.tmp.`%s`(type => 'paimon', snapshotId => %d, snapshotAsOfTime => %d))", + tableRelativePath, 123, 456); + try { + queryBuilder().sql(query).run(); + fail(); + } catch (UserRemoteException e) { + assertThat(e.getVerboseMessage(), + containsString("Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified")); + } + } + + @Test + public void testSelectSnapshotsMetadata() throws Exception { + String query = String.format("select * from dfs.tmp.`%s#snapshots`", tableRelativePath); + + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectSchemasMetadata() throws Exception { + String query = String.format("select * from dfs.tmp.`%s#schemas`", tableRelativePath); + + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(1, count); + } + + @Test + public void testSelectFilesMetadata() throws Exception { + String query = String.format("select * from dfs.tmp.`%s#files`", tableRelativePath); + + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + @Test + public void testSelectManifestsMetadata() throws Exception { + String query = String.format("select * from dfs.tmp.`%s#manifests`", tableRelativePath); + + long count = queryBuilder().sql(query).run().recordCount(); + assertEquals(2, count); + } + + private static String createAppendTable() throws Exception { + Path dfsRoot = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath()); + Path warehouseDir = dfsRoot.resolve("paimon_warehouse"); + + Options options = new Options(); + options.set("warehouse", warehouseDir.toUri().toString()); + options.set("metastore", "filesystem"); + + CatalogContext context = CatalogContext.create(options, new Configuration()); + try (Catalog catalog = CatalogFactory.createCatalog(context)) { + catalog.createDatabase(DB_NAME, true); + + Schema schema = Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .build(); + Identifier identifier = Identifier.create(DB_NAME, TABLE_NAME); + catalog.createTable(identifier, schema, false); + + Table table = catalog.getTable(identifier); + writeRows(table, Arrays.asList( + GenericRow.of(1, BinaryString.fromString("alice")), + GenericRow.of(2, BinaryString.fromString("bob")) + )); + writeRows(table, Arrays.asList( + GenericRow.of(3, BinaryString.fromString("carol")) + )); + } + + Path tablePath = warehouseDir.resolve(DB_NAME + ".db").resolve(TABLE_NAME); + Path relativePath = dfsRoot.relativize(tablePath); + return relativePath.toString().replace('\\', '/'); + } + + private static String createPrimaryKeyTable() throws Exception { + Path dfsRoot = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath()); + Path warehouseDir = dfsRoot.resolve("paimon_warehouse"); + + Options options = new Options(); + options.set("warehouse", warehouseDir.toUri().toString()); + options.set("metastore", "filesystem"); + + CatalogContext context = CatalogContext.create(options, new Configuration()); + try (Catalog catalog = CatalogFactory.createCatalog(context)) { + catalog.createDatabase(DB_NAME, true); + + Schema schema = Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .option(CoreOptions.BUCKET.key(), "1") + .primaryKey("id") + .build(); + Identifier identifier = Identifier.create(DB_NAME, PK_TABLE_NAME); + catalog.createTable(identifier, schema, false); + + Table table = catalog.getTable(identifier); + writeRows(table, Arrays.asList( + GenericRow.of(1, BinaryString.fromString("dave")), + GenericRow.of(2, BinaryString.fromString("erin")) + )); + } + + Path tablePath = warehouseDir.resolve(DB_NAME + ".db").resolve(PK_TABLE_NAME); + Path relativePath = dfsRoot.relativize(tablePath); + return relativePath.toString().replace('\\', '/'); + } + + private static void writeRows(Table table, List rows) throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + List messages; + try (BatchTableWrite write = writeBuilder.newWrite()) { + for (GenericRow row : rows) { + write.write(row); + } + messages = write.prepareCommit(); + } + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(messages); + } + } + +} diff --git a/contrib/pom.xml b/contrib/pom.xml index 50ccdc90dd0..4c1e0bfc19c 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -51,6 +51,7 @@ format-image format-log format-ltsv + format-paimon format-pcapng format-pdf format-sas diff --git a/distribution/pom.xml b/distribution/pom.xml index 23119c241ed..3891327df3d 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -521,6 +521,11 @@ drill-iceberg-format ${project.version} + + org.apache.drill.contrib + drill-paimon-format + ${project.version} + org.apache.drill.contrib drill-deltalake-format diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml index 66792fd43f6..d30caef9a8b 100644 --- a/distribution/src/assemble/component.xml +++ b/distribution/src/assemble/component.xml @@ -48,6 +48,7 @@ org.apache.drill.contrib:drill-kudu-storage:jar org.apache.drill.contrib:drill-mongo-storage:jar org.apache.drill.contrib:drill-opentsdb-storage:jar + org.apache.drill.contrib:drill-paimon-format:jar org.apache.drill.contrib:drill-storage-cassandra:jar org.apache.drill.contrib:drill-storage-elasticsearch:jar org.apache.drill.contrib:drill-storage-googlesheets:jar diff --git a/pom.xml b/pom.xml index f229eb67d0f..c8c88abb90c 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ 0.25 4.9.3 + 1.1.2 9.5 1.23.0 1.12.0 @@ -93,6 +94,7 @@ 4.5.14 5.11.0 0.12.1 + 1.3.1 2.18.3 3.1.12 3.29.2-GA @@ -1008,6 +1010,11 @@ + + org.apiguardian + apiguardian-api + ${apiguardian.version} + org.apache.calcite.avatica avatica-core