feat : support spark compatible int to timestamp cast#20555
feat : support spark compatible int to timestamp cast#20555coderfender wants to merge 1 commit intoapache:mainfrom
Conversation
339a475 to
05c433b
Compare
05c433b to
9e71267
Compare
| #[derive(Debug, PartialEq, Eq, Hash)] | ||
| pub struct Cast { | ||
| signature: Signature, | ||
| } |
| } | ||
|
|
||
| fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> { | ||
| // for now we will be supporting int -> timestamp and keep adding more spark-compatible spark |
There was a problem hiding this comment.
| // for now we will be supporting int -> timestamp and keep adding more spark-compatible spark | |
| // for now we will be supporting int -> timestamp and keep adding more spark-compatible casts |
?!
| &self.signature | ||
| } | ||
|
|
||
| fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> { |
There was a problem hiding this comment.
Consider implementing return_field_from_args() instead.
It gives better support for deciding whether the return type is nullable or not.
See datafusion/functions/src/core/coalesce.rs for inspiration.
Something like:
fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult<DataType> {
internal_err!("return_field_from_args should be used instead")
}
fn return_field_from_args(&self, args: ReturnFieldArgs) -> DataFusionResult<FieldRef> {
let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
Ok(Arc::new(Field::new(
self.name(),
DataType::Timestamp(TimeUnit::Microsecond, None),
nullable,
)))
}| use std::sync::Arc; | ||
| const MICROS_PER_SECOND: i64 = 1_000_000; | ||
|
|
||
| #[derive(Debug, PartialEq, Eq, Hash)] |
There was a problem hiding this comment.
Please add some documentation.
With a link to the Spark function that is implemented.
| impl Cast { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| signature: Signature::any(1, Volatility::Immutable), |
There was a problem hiding this comment.
I think the signature should specify a target type.
At the moment only TimestampMicrosecond is supported but later when other types are needed you will need to add the second parameter.
SELECT spark_cast(arrow_cast(0, 'Int8')); does not tell me anyhow that 0_i8 will be casted to a timestamp.
| if arr.is_null(i) { | ||
| builder.append_null(); | ||
| } else { | ||
| let micros = (arr.value(i).into()).saturating_mul(MICROS_PER_SECOND); |
There was a problem hiding this comment.
Spark does not saturate.
spark-sql (default)> SELECT cast(1 AS TIMESTAMP);
1970-01-01 02:00:01
Time taken: 1.06 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(987654321 AS TIMESTAMP);
2001-04-19 07:25:21
Time taken: 0.035 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(987654321012 AS TIMESTAMP);
+33267-07-09 09:30:12
Time taken: 0.036 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(987654321012345 AS TIMESTAMP);
+294247-01-10 06:00:54.775807
Time taken: 0.035 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(9876543210123456789 AS TIMESTAMP);
+282703-12-03 02:32:57.380672
Time taken: 0.034 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(98765432101234567890987654321 AS TIMESTAMP);
-68156-01-09 16:20:08.49952
Time taken: 0.04 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(98765432101234567890987654321434636434636432463463462362362 AS TIMESTAMP);
[DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 59 exceeds max precision 38. SQLSTATE: 22003
org.apache.spark.SparkArithmeticException: [DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 59 exceeds max precision 38. SQLSTATE: 22003
at org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:45)
at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:52)
at org.apache.spark.sql.types.DecimalType$.fromDecimal(DecimalType.scala:142)
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:85)
| ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, | ||
| }; | ||
| use std::any::Any; | ||
| use std::sync::Arc; |
There was a problem hiding this comment.
| use std::sync::Arc; | |
| use std::sync::Arc; | |
| let args = make_args(ColumnarValue::Scalar(ScalarValue::Int8(Some(100)))); | ||
| let result = cast.invoke_with_args(args).unwrap(); | ||
| assert_scalar_timestamp(result, 100_000_000); | ||
| } |
There was a problem hiding this comment.
| } | |
| } | |
| #[test] | |
| fn test_cast_scalar_int16() { | |
| let cast = Cast::new(); | |
| let args = make_args(ColumnarValue::Scalar(ScalarValue::Int16(Some(100)))); | |
| let result = cast.invoke_with_args(args).unwrap(); | |
| assert_scalar_timestamp(result, 100_000_000); | |
| } |
| use datafusion_expr::ScalarUDF; | ||
| use std::sync::Arc; | ||
|
|
||
| pub mod expr_fn {} |
There was a problem hiding this comment.
Maybe add an export_functions! here ?!
| query P | ||
| SELECT spark_cast(NULL::bigint); | ||
| ---- | ||
| NULL |
There was a problem hiding this comment.
How does it behave for SELECT spark_cast(NULL); ? Let's add it
You may need to add | DataType::Null to line 85
|
Thank you . I will address the review comments shortly |
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
spark_castas a first step. Next step would be to make changes to the planner to leverage spark compatible cast instead of regular cast operations