[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884
[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884peter-toth wants to merge 6 commits intoapache:masterfrom
Reducers#54884Conversation
580ca49 to
fa4bce7
Compare
| object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] { | ||
| override def inputTypes(): Array[DataType] = Array(TimestampType) | ||
| override def resultType(): DataType = LongType | ||
| override def resultType(): DataType = IntegerType |
There was a problem hiding this comment.
I changed the test years transform to return IntegerType and the test days transform to return DateType logical types, because those 2 differ but have the same PhysicalIntegerType physical type.
I also made days reducible to years, which is very similar to what Iceberg can do with hours and days.
|
cc @szehon-ho , @dongjoon-hyun |
|
|
||
| /** | ||
| * Returns the {@link DataType data type} of values produced by this function. | ||
| * It can return null to signal it doesn't change the input type. |
There was a problem hiding this comment.
It's a little counter-intuitive design. May I ask why we need to use null instead of returning the input type, @peter-toth ?
There was a problem hiding this comment.
Unfortunately, in this interface we the don't have access to the original transform function's result type (the type argument I is not a Spark logical type), but we need to return some default value to indicate that the reducer doesn't change the result type (whatever it is).
There was a problem hiding this comment.
Could you elaborate that in the function description explicitly, please?
There was a problem hiding this comment.
I was thinking about this. Does this change makes it more intuitive?
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
| .createWithDefault(false) | ||
|
|
||
| val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES = | ||
| buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled") |
There was a problem hiding this comment.
Do you think we can set this this configuration false for some cases in the future, @peter-toth ? I'm a little confused when it makes senses that we are going to disallow incompatible transform types.
There was a problem hiding this comment.
This is a good question and I too was thinking about it. I feel we should not compare different logical types due to their different semantical meanings, but seemingly this is what we do currently in some cases, so we should probably keep the behavior for now. I think in a future Spark release we can change this config to make sure a comparison makes sense.
There was a problem hiding this comment.
yea im also thinking, if there is some dangerous discrepancy now , it is worth a behave change to fix it.
The only consumer that i know of is Iceberg , which has hoursToDay reducer that changes type, and bucketReducer (which doesnt change type). Iceberg will need to recompile against Spark 4.2 anyway so it's an opportunity for us to fix it there.
wdyt (as regards to the Spark release policy) ?
There was a problem hiding this comment.
Yeah, very likely Iceberg is the only project that implemented reducers.
If we are ok with fixing the issue in Iceberg then probably we don't need the latest commit, but we can keep resultType() in Reducer, remove its default value and drop this config.
There was a problem hiding this comment.
I'm actively testing Spark 4.2.0 integration in Iceberg. The issue was only in 4.2.0-preview3 and I can work on the Iceberg changes for next preview release. +1 to drop this config.
There was a problem hiding this comment.
Ok with me.
@szehon-ho , @dongjoon-hyun let me know your preference and I can change this PR.
|
cc @aokolnychyi , @cloud-fan , @gengliangwang , too. |
| } | ||
|
|
||
| object YearsFunction extends ScalarFunction[Long] { | ||
| object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] { |
There was a problem hiding this comment.
Do you think we can spin-off this one independently from this PR?
There was a problem hiding this comment.
Technically I could, but I don't see a strong reason to separate these test function changes from the other parts of the PR.
We could argue that a spin-off makes sense to make these functions similar to their Iceberg versions, but that's not necessary needed for existing generic DSv2 tests. Actually, this particular PR requires 2 test functions with different logical but same physical result types and making them similar to their Iceberg versions is just a coincidence.
There was a problem hiding this comment.
But just let me know if you think a spin-off still makes sense.
There was a problem hiding this comment.
It's simply I can help you more easily if you can offer a spin-off PR for this PR. (in terms of the speed of review speed based on the narrowed review scope?)
Anyway, it's up to you, @peter-toth . You can keep all in the single bucket as you wish. No arguable point on that. :)
|
Thank you for the catching this and providing a fix promptly, @peter-toth . |
|
cc @szehon-ho as well |
|
im taking a look, thanks |
| /** | ||
| * Returns the {@link DataType data type} of values produced by this reducer. | ||
| * | ||
| * As a reducer doesn't know the result {@link DataType data type} of the reduced transform |
There was a problem hiding this comment.
this comment is a bit confusing? do you mean, 'if the reducer doens't know the result...'?
anyway this was a mistake from my end when introducing the API, sorry about it, i think this the resultType is actuallly pretty important to have.
One more thought, we can also clarify the class tag O to also use the same langauge (result), and indicate that its the physical Java type to reduce confusion
There was a problem hiding this comment.
Yeah, this sentence was confusing.
In the latest commit I extracted the new resultType() method to TypedReducer to address @dongjoon-hyun's comment about the counter-intuitive null usage (#54884 (comment)) and ellaborated on the types.
the previously always using the left side key type behavior is indeed problematic, but the new rule looks too strict, is it possible to follow the behavior of join key type mismatch handling? When a join has an
|
I think this is a bit different issue to type coercion as the |
@peter-toth, this sounds reasonable, maybe we should emphasize that in the javadocs? the
|
What changes were proposed in this pull request?
This PR adds a new method to SPJ partition key
Reducers to return the type of a reduced partition key.Why are the changes needed?
After the SPJ refactor some Iceberg SPJ tests, that join a
hourstransform partitioned table with adaystransform partitioned table, started to fail. This is because after the refactor the keys of aKeyedPartitioningpartitioning areInternalRowComparableWrappers, which include the type of the key, and when the partition keys are reduced the type of the reduced keys are inherited from their original type.KeyGroupedPartitioningand Storage Partition Join #54330This means that when
hourstransformed hour keys are reduced to days, the keys actually remain havingIntegerTypetype, while thedaystransformed keys haveDateTypetype in Iceberg. This type difference causes that the left and right sideInternalRowComparableWrappers are not considered equal despite theirInternalRowraw key data are equal.Before the refactor the type of (possibly reduced) partition keys were not stored in the partitioning. When the left and right side raw keys were compared in
EnsureRequirementa common comparator was initialized with the type of the left side keys.So in the Iceberg SPJ tests the
IntegerTypekeys were forced to be interpreted asDateType, or theDateTypekeys were forced to be interpreted asIntegerType, depending on the join order of the tables.The reason why this was not causing any issues is that the
PhysicalDataTypeof bothDateTypeandIntegerTypelogical types isPhysicalIntegerType.This PR:
TypedReducerwithresultType()method to return the correct type of the reduced keys.spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled=trueflag to keep the old behavior and consider the reduced key types the same if they share a common physical type.Does this PR introduce any user-facing change?
Yes, the reduced key types are now properly compared and incompatibilities are reported to users, but the legacy flag can allow the old behaviour.
How was this patch tested?
Added new UTs.
Was this patch authored or co-authored using generative AI tooling?
No.