-
Notifications
You must be signed in to change notification settings - Fork 300
feat: support sort_array expression #3706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
adb4e57
0c3a13d
d47f856
2a08799
3721201
8fea480
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -105,7 +105,7 @@ | |
| - [ ] sequence | ||
| - [ ] shuffle | ||
| - [ ] slice | ||
| - [ ] sort_array | ||
| - [x] sort_array | ||
|
|
||
| ### bitwise_funcs | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,11 +21,12 @@ package org.apache.comet.serde | |
|
|
||
| import scala.annotation.tailrec | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size} | ||
| import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size, SortArray} | ||
| import org.apache.spark.sql.catalyst.util.GenericArrayData | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
| import org.apache.comet.CometConf | ||
| import org.apache.comet.CometSparkSessionExtensions.withInfo | ||
| import org.apache.comet.serde.QueryPlanSerde._ | ||
| import org.apache.comet.shims.CometExprShim | ||
|
|
@@ -200,6 +201,80 @@ object CometArrayDistinct extends CometExpressionSerde[ArrayDistinct] { | |
| } | ||
| } | ||
|
|
||
| object CometSortArray extends CometExpressionSerde[SortArray] { | ||
| private def containsFloatingPoint(dt: DataType): Boolean = { | ||
|
grorge123 marked this conversation as resolved.
|
||
| dt match { | ||
| case FloatType | DoubleType => true | ||
| case ArrayType(elementType, _) => containsFloatingPoint(elementType) | ||
| case StructType(fields) => fields.exists(f => containsFloatingPoint(f.dataType)) | ||
| case MapType(keyType, valueType, _) => | ||
| containsFloatingPoint(keyType) || containsFloatingPoint(valueType) | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| private def supportedSortArrayElementType( | ||
| dt: DataType, | ||
| nestedInArray: Boolean = false): Boolean = { | ||
| dt match { | ||
| // DataFusion's array_sort compares nested arrays through Arrow's rank kernel. | ||
| // That kernel does not support Struct or Null child values, | ||
| // so array<array<struct<...>>> and array<array<null>> would fail at runtime. | ||
| case _: NullType if !nestedInArray => | ||
| true | ||
| case ArrayType(elementType, _) => | ||
| supportedSortArrayElementType(elementType, nestedInArray = true) | ||
| case StructType(fields) if !nestedInArray => | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you add a comment explaining why there is a restriction around structs in arrays?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I have added it. Besides, I found nulltype has a similar problem, I have fixed it. |
||
| fields.forall(f => supportedSortArrayElementType(f.dataType)) | ||
| case _ => | ||
| supportedScalarSortElementType(dt) | ||
| } | ||
| } | ||
|
|
||
| override def getSupportLevel(expr: SortArray): SupportLevel = { | ||
| val elementType = expr.base.dataType.asInstanceOf[ArrayType].elementType | ||
|
|
||
| if (!supportedSortArrayElementType(elementType)) { | ||
| Unsupported(Some(s"Sort on array element type $elementType is not supported")) | ||
| } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() && | ||
| containsFloatingPoint(elementType)) { | ||
| Incompatible( | ||
| Some( | ||
| "Sorting on floating-point is not 100% compatible with Spark, and Comet is running " + | ||
| s"with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " + | ||
| s"${CometConf.COMPAT_GUIDE}")) | ||
| } else { | ||
| Compatible() | ||
| } | ||
| } | ||
|
|
||
| override def convert( | ||
| expr: SortArray, | ||
| inputs: Seq[Attribute], | ||
| binding: Boolean): Option[ExprOuterClass.Expr] = { | ||
| val arrayExprProto = exprToProtoInternal(expr.base, inputs, binding) | ||
| val (sortDirectionExprProto, nullOrderingExprProto) = expr.ascendingOrder match { | ||
| case Literal(value: Boolean, BooleanType) => | ||
|
grorge123 marked this conversation as resolved.
|
||
| val direction = if (value) "ASC" else "DESC" | ||
| val nullOrdering = if (value) "NULLS FIRST" else "NULLS LAST" | ||
| ( | ||
| exprToProtoInternal(Literal(direction), inputs, binding), | ||
| exprToProtoInternal(Literal(nullOrdering), inputs, binding)) | ||
| case other => | ||
| withInfo(expr, s"ascendingOrder must be a boolean literal: $other") | ||
| (None, None) | ||
| } | ||
|
|
||
| val sortArrayScalarExpr = | ||
| scalarFunctionExprToProto( | ||
| "array_sort", | ||
| arrayExprProto, | ||
| sortDirectionExprProto, | ||
| nullOrderingExprProto) | ||
| optExprWithInfo(sortArrayScalarExpr, expr, expr.children: _*) | ||
| } | ||
| } | ||
|
|
||
| object CometArrayIntersect extends CometExpressionSerde[ArrayIntersect] { | ||
|
|
||
| override def getSupportLevel(expr: ArrayIntersect): SupportLevel = Incompatible(None) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please combine all true branches?