Skip to content

[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884

Open
peter-toth wants to merge 6 commits intoapache:masterfrom
peter-toth:SPARK-56046-typed-spj-reducers
Open

[SPARK-56046][SQL] Typed SPJ partition key Reducers#54884
peter-toth wants to merge 6 commits intoapache:masterfrom
peter-toth:SPARK-56046-typed-spj-reducers

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Mar 18, 2026

What changes were proposed in this pull request?

This PR adds a new method to SPJ partition key Reducers to return the type of a reduced partition key.

Why are the changes needed?

After the SPJ refactor some Iceberg SPJ tests, that join a hours transform partitioned table with a days transform partitioned table, started to fail. This is because after the refactor the keys of a KeyedPartitioning partitioning are InternalRowComparableWrappers, which include the type of the key, and when the partition keys are reduced the type of the reduced keys are inherited from their original type.

This means that when hours transformed hour keys are reduced to days, the keys actually remain having IntegerType type, while the days transformed keys have DateType type in Iceberg. This type difference causes that the left and right side InternalRowComparableWrappers are not considered equal despite their InternalRow raw key data are equal.

Before the refactor the type of (possibly reduced) partition keys were not stored in the partitioning. When the left and right side raw keys were compared in EnsureRequirement a common comparator was initialized with the type of the left side keys.
So in the Iceberg SPJ tests the IntegerType keys were forced to be interpreted as DateType, or the DateType keys were forced to be interpreted as IntegerType, depending on the join order of the tables.
The reason why this was not causing any issues is that the PhysicalDataType of both DateType and IntegerType logical types is PhysicalIntegerType.

This PR:

  • Introduces a new TypedReducer with resultType() method to return the correct type of the reduced keys.
  • Properly compares the left and right side reduced key types and return an error when they are not the same.
  • Adds a new spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled=true flag to keep the old behavior and consider the reduced key types the same if they share a common physical type.

Does this PR introduce any user-facing change?

Yes, the reduced key types are now properly compared and incompatibilities are reported to users, but the legacy flag can allow the old behaviour.

How was this patch tested?

Added new UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@peter-toth peter-toth force-pushed the SPARK-56046-typed-spj-reducers branch from 580ca49 to fa4bce7 Compare March 18, 2026 13:54
@peter-toth peter-toth marked this pull request as draft March 18, 2026 16:24
object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] {
override def inputTypes(): Array[DataType] = Array(TimestampType)
override def resultType(): DataType = LongType
override def resultType(): DataType = IntegerType
Copy link
Contributor Author

@peter-toth peter-toth Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the test years transform to return IntegerType and the test days transform to return DateType logical types, because those 2 differ but have the same PhysicalIntegerType physical type.
I also made days reducible to years, which is very similar to what Iceberg can do with hours and days.

@peter-toth
Copy link
Contributor Author

cc @szehon-ho , @dongjoon-hyun

@peter-toth peter-toth marked this pull request as ready for review March 18, 2026 17:20

/**
* Returns the {@link DataType data type} of values produced by this function.
* It can return null to signal it doesn't change the input type.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little counter-intuitive design. May I ask why we need to use null instead of returning the input type, @peter-toth ?

Copy link
Contributor Author

@peter-toth peter-toth Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, in this interface we the don't have access to the original transform function's result type (the type argument I is not a Spark logical type), but we need to return some default value to indicate that the reducer doesn't change the result type (whatever it is).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate that in the function description explicitly, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more documentation in a00c069.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this. Does this change makes it more intuitive?

.createWithDefault(false)

val V2_BUCKETING_ALLOW_INCOMPATIBLE_TRANSFORM_TYPES =
buildConf("spark.sql.sources.v2.bucketing.allowIncompatibleTransformTypes.enabled")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can set this this configuration false for some cases in the future, @peter-toth ? I'm a little confused when it makes senses that we are going to disallow incompatible transform types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question and I too was thinking about it. I feel we should not compare different logical types due to their different semantical meanings, but seemingly this is what we do currently in some cases, so we should probably keep the behavior for now. I think in a future Spark release we can change this config to make sure a comparison makes sense.

Copy link
Member

@szehon-ho szehon-ho Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea im also thinking, if there is some dangerous discrepancy now , it is worth a behave change to fix it.

The only consumer that i know of is Iceberg , which has hoursToDay reducer that changes type, and bucketReducer (which doesnt change type). Iceberg will need to recompile against Spark 4.2 anyway so it's an opportunity for us to fix it there.

wdyt (as regards to the Spark release policy) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, very likely Iceberg is the only project that implemented reducers.

If we are ok with fixing the issue in Iceberg then probably we don't need the latest commit, but we can keep resultType() in Reducer, remove its default value and drop this config.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actively testing Spark 4.2.0 integration in Iceberg. The issue was only in 4.2.0-preview3 and I can work on the Iceberg changes for next preview release. +1 to drop this config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok with me.
@szehon-ho , @dongjoon-hyun let me know your preference and I can change this PR.

@dongjoon-hyun
Copy link
Member

cc @aokolnychyi , @cloud-fan , @gengliangwang , too.

}

object YearsFunction extends ScalarFunction[Long] {
object YearsFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can spin-off this one independently from this PR?

Copy link
Contributor Author

@peter-toth peter-toth Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically I could, but I don't see a strong reason to separate these test function changes from the other parts of the PR.
We could argue that a spin-off makes sense to make these functions similar to their Iceberg versions, but that's not necessary needed for existing generic DSv2 tests. Actually, this particular PR requires 2 test functions with different logical but same physical result types and making them similar to their Iceberg versions is just a coincidence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But just let me know if you think a spin-off still makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's simply I can help you more easily if you can offer a spin-off PR for this PR. (in terms of the speed of review speed based on the narrowed review scope?)

Anyway, it's up to you, @peter-toth . You can keep all in the single bucket as you wish. No arguable point on that. :)

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 18, 2026

Thank you for the catching this and providing a fix promptly, @peter-toth .
I'll leave this to the other reviewers.

@gengliangwang
Copy link
Member

cc @szehon-ho as well

@szehon-ho
Copy link
Member

im taking a look, thanks

/**
* Returns the {@link DataType data type} of values produced by this reducer.
*
* As a reducer doesn't know the result {@link DataType data type} of the reduced transform
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is a bit confusing? do you mean, 'if the reducer doens't know the result...'?

anyway this was a mistake from my end when introducing the API, sorry about it, i think this the resultType is actuallly pretty important to have.

One more thought, we can also clarify the class tag O to also use the same langauge (result), and indicate that its the physical Java type to reduce confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this sentence was confusing.

In the latest commit I extracted the new resultType() method to TypedReducer to address @dongjoon-hyun's comment about the counter-intuitive null usage (#54884 (comment)) and ellaborated on the types.

@pan3793
Copy link
Member

pan3793 commented Mar 19, 2026

Properly compares the left and right side reduced key types and return an error when they are not the same.

the previously always using the left side key type behavior is indeed problematic, but the new rule looks too strict, is it possible to follow the behavior of join key type mismatch handling?

When a join has an EqualTo(leftKey, rightKey) condition where types differ, ImplicitTypeCoercion kicks in:

  1. Calls findTightestCommonType(left.dataType, right.dataType) to find a compatible type
  2. Wraps operands in Cast expressions to coerce both to the common type

@peter-toth
Copy link
Contributor Author

peter-toth commented Mar 19, 2026

When a join has an EqualTo(leftKey, rightKey) condition where types differ, ImplicitTypeCoercion kicks in:

  1. Calls findTightestCommonType(left.dataType, right.dataType) to find a compatible type
  2. Wraps operands in Cast expressions to coerce both to the common type

I think this is a bit different issue to type coercion as the ReducibleFunctions on both sides know each other when they return the Reducers. The Reducers' responsibility to produce comparable reduced values. The only issue now is that we don't know the type of those values.

@pan3793
Copy link
Member

pan3793 commented Mar 19, 2026

The Reducers' responsibility to produce comparable reduced values.

@peter-toth, this sounds reasonable, maybe we should emphasize that in the javadocs? the = check requires exactly both value and data type match

  • ... r(f_source(x)) = f_target(x) ...
  • ... r1(f_source(x)) = r2(f_target(x)) ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants