Skip to content

feat : support spark compatible int to timestamp cast#20555

Open
coderfender wants to merge 1 commit intoapache:mainfrom
coderfender:df_int_timestamp_cast
Open

feat : support spark compatible int to timestamp cast#20555
coderfender wants to merge 1 commit intoapache:mainfrom
coderfender:df_int_timestamp_cast

Conversation

@coderfender
Copy link

@coderfender coderfender commented Feb 25, 2026

Which issue does this PR close?

Rationale for this change

What changes are included in this PR?

  1. Created a new cast scalar UDF to support int to timestamp casts (spark compatible). The goal is to leverage this is as an entry point for all spark compatible / df incompatible casts

Are these changes tested?

  1. Yes (through series of unit tests testing various edge cases like overflow , null input etc)

Are there any user-facing changes?

  1. Yes. We are bringing in ability to leverage spark compatible cast operations in DF through spark_cast as a first step. Next step would be to make changes to the planner to leverage spark compatible cast instead of regular cast operations

@github-actions github-actions bot added the spark label Feb 25, 2026
@coderfender coderfender force-pushed the df_int_timestamp_cast branch 3 times, most recently from 339a475 to 05c433b Compare February 25, 2026 17:26
@coderfender coderfender marked this pull request as draft February 25, 2026 17:28
@coderfender coderfender force-pushed the df_int_timestamp_cast branch from 05c433b to 9e71267 Compare February 25, 2026 19:50
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Feb 25, 2026
@coderfender coderfender marked this pull request as ready for review February 25, 2026 19:54
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Cast {
signature: Signature,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}

}

fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
// for now we will be supporting int -> timestamp and keep adding more spark-compatible spark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// for now we will be supporting int -> timestamp and keep adding more spark-compatible spark
// for now we will be supporting int -> timestamp and keep adding more spark-compatible casts

?!

&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> DataFusionResult<DataType> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider implementing return_field_from_args() instead.
It gives better support for deciding whether the return type is nullable or not.
See datafusion/functions/src/core/coalesce.rs for inspiration.

Something like:

fn return_type(&self, _arg_types: &[DataType]) -> DataFusionResult<DataType> {
    internal_err!("return_field_from_args should be used instead")
}

fn return_field_from_args(&self, args: ReturnFieldArgs) -> DataFusionResult<FieldRef> {
    let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
    Ok(Arc::new(Field::new(
        self.name(),
        DataType::Timestamp(TimeUnit::Microsecond, None),
        nullable,
    )))
}

use std::sync::Arc;
const MICROS_PER_SECOND: i64 = 1_000_000;

#[derive(Debug, PartialEq, Eq, Hash)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some documentation.
With a link to the Spark function that is implemented.

impl Cast {
pub fn new() -> Self {
Self {
signature: Signature::any(1, Volatility::Immutable),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the signature should specify a target type.
At the moment only TimestampMicrosecond is supported but later when other types are needed you will need to add the second parameter.
SELECT spark_cast(arrow_cast(0, 'Int8')); does not tell me anyhow that 0_i8 will be casted to a timestamp.

if arr.is_null(i) {
builder.append_null();
} else {
let micros = (arr.value(i).into()).saturating_mul(MICROS_PER_SECOND);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark does not saturate.

spark-sql (default)> SELECT cast(1 AS TIMESTAMP);
1970-01-01 02:00:01
Time taken: 1.06 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(987654321 AS TIMESTAMP);
2001-04-19 07:25:21
Time taken: 0.035 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(987654321012 AS TIMESTAMP);
+33267-07-09 09:30:12
Time taken: 0.036 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(987654321012345 AS TIMESTAMP);
+294247-01-10 06:00:54.775807
Time taken: 0.035 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(9876543210123456789 AS TIMESTAMP);
+282703-12-03 02:32:57.380672
Time taken: 0.034 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(98765432101234567890987654321 AS TIMESTAMP);
-68156-01-09 16:20:08.49952
Time taken: 0.04 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT cast(98765432101234567890987654321434636434636432463463462362362 AS TIMESTAMP);
[DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 59 exceeds max precision 38. SQLSTATE: 22003
org.apache.spark.SparkArithmeticException: [DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 59 exceeds max precision 38. SQLSTATE: 22003
        at org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:45)
        at org.apache.spark.sql.types.DecimalType.<init>(DecimalType.scala:52)
        at org.apache.spark.sql.types.DecimalType$.fromDecimal(DecimalType.scala:142)
        at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:85)

ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use std::sync::Arc;
use std::sync::Arc;

let args = make_args(ColumnarValue::Scalar(ScalarValue::Int8(Some(100))));
let result = cast.invoke_with_args(args).unwrap();
assert_scalar_timestamp(result, 100_000_000);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}
#[test]
fn test_cast_scalar_int16() {
let cast = Cast::new();
let args = make_args(ColumnarValue::Scalar(ScalarValue::Int16(Some(100))));
let result = cast.invoke_with_args(args).unwrap();
assert_scalar_timestamp(result, 100_000_000);
}

use datafusion_expr::ScalarUDF;
use std::sync::Arc;

pub mod expr_fn {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add an export_functions! here ?!

query P
SELECT spark_cast(NULL::bigint);
----
NULL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it behave for SELECT spark_cast(NULL); ? Let's add it
You may need to add | DataType::Null to line 85

@coderfender
Copy link
Author

Thank you . I will address the review comments shortly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

spark sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support spark compatible cast from int -> timestamp

2 participants