diff --git a/Cargo.lock b/Cargo.lock index 098a0afb989e9..e94650f8bc85a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1827,6 +1827,7 @@ dependencies = [ "itertools 0.15.0", "log", "object_store", + "percent-encoding", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index df9cdfed40e00..e56c7dd8a9d22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -185,6 +185,7 @@ parquet = { version = "59.0.0", default-features = false, features = [ ] } pbjson = { version = "0.9.0" } pbjson-types = "0.9" +percent-encoding = "2.3" pin-project = "1" # Should match arrow-flight's version of prost. prost = "0.14.1" diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 61b55397137df..abe58f45994be 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -46,6 +46,7 @@ futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } +percent-encoding = { workspace = true } [dev-dependencies] chrono = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 6409b45f17ccd..53c37ae184790 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -17,6 +17,7 @@ //! Helper functions for the table implementation +use std::borrow::Cow; use std::sync::Arc; use datafusion_catalog::Session; @@ -43,6 +44,7 @@ use datafusion_expr::{Expr, Volatility}; use datafusion_physical_expr::create_physical_expr; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use percent_encoding::percent_decode_str; /// Check whether the given expression can be resolved using only the columns `col_names`. /// This means that if this function returns true: @@ -343,7 +345,7 @@ fn try_into_partitioned_file( .into_iter() .zip(partition_cols) .map(|(parsed, (_, datatype))| { - ScalarValue::try_from_string(parsed.to_string(), datatype) + ScalarValue::try_from_string(parsed.into_owned(), datatype) }) .collect::>>()?; @@ -435,12 +437,15 @@ fn object_meta_to_partitioned_file( } /// Extract the partition values for the given `file_path` (in the given `table_path`) -/// associated to the partitions defined by `table_partition_cols` +/// associated to the partitions defined by `table_partition_cols`. +/// +/// Partition values are percent-decoded to match Hive-style object-store paths +/// that encode special characters in path segments. pub fn parse_partitions_for_path<'a, I>( table_path: &ListingTableUrl, file_path: &'a Path, table_partition_cols: I, -) -> Option> +) -> Option>> where I: IntoIterator, { @@ -449,7 +454,13 @@ where let mut part_values = vec![]; for (part, expected_partition) in subpath.zip(table_partition_cols) { match part.split_once('=') { - Some((name, val)) if name == expected_partition => part_values.push(val), + Some((name, val)) if name == expected_partition => { + // Preserve the original value if percent-decoding produces invalid UTF-8. + let decoded = percent_decode_str(val) + .decode_utf8() + .unwrap_or(Cow::Borrowed(val)); + part_values.push(decoded); + } _ => { debug!( "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'", @@ -525,7 +536,7 @@ mod tests { #[test] fn test_parse_partitions_for_path() { assert_eq!( - Some(vec![]), + Some(vec![] as Vec>), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/file.csv"), @@ -549,7 +560,7 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1"]), + Some(vec![Cow::Borrowed("v1")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/file.csv"), @@ -557,7 +568,48 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1"]), + Some(vec![Cow::::Owned("v/1".to_string())]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/mypartition=v%2F1/file.csv").unwrap(), + vec!["mypartition"] + ) + ); + assert_eq!( + Some(vec![Cow::::Owned("John Doe".to_string())]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/name=John%20Doe/file.csv").unwrap(), + vec!["name"] + ) + ); + assert_eq!( + Some(vec![Cow::::Owned("test dir/file".to_string())]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/mypartition=test%20dir%2Ffile/file.csv") + .unwrap(), + vec!["mypartition"] + ) + ); + assert_eq!( + Some(vec![Cow::::Owned("é".to_string())]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/mypartition=%C3%A9/file.csv").unwrap(), + vec!["mypartition"] + ) + ); + assert_eq!( + Some(vec![Cow::Borrowed("%FF")]), + parse_partitions_for_path( + &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), + &Path::parse("bucket/mytable/mypartition=%FF/file.csv").unwrap(), + vec!["mypartition"] + ) + ); + assert_eq!( + Some(vec![Cow::Borrowed("v1")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(), &Path::from("bucket/mytable/mypartition=v1/file.csv"), @@ -574,7 +626,7 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1", "v2"]), + Some(vec![Cow::Borrowed("v1"), Cow::Borrowed("v2")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), @@ -582,7 +634,7 @@ mod tests { ) ); assert_eq!( - Some(vec!["v1"]), + Some(vec![Cow::Borrowed("v1")]), parse_partitions_for_path( &ListingTableUrl::parse("file:///bucket/mytable").unwrap(), &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"), @@ -614,6 +666,32 @@ mod tests { ); } + #[test] + fn test_try_into_partitioned_file_decodes_partition_value() { + let table_path = ListingTableUrl::parse("file:///bucket/mytable").unwrap(); + let partition_cols = vec![("category".to_string(), DataType::Utf8)]; + let meta = ObjectMeta { + location: Path::parse( + "bucket/mytable/category=Electronics%2FComputers/data.parquet", + ) + .unwrap(), + last_modified: chrono::Utc::now(), + size: 100, + e_tag: None, + version: None, + }; + + let result = + try_into_partitioned_file(meta, &partition_cols, &table_path).unwrap(); + assert!(result.is_some()); + let pf = result.unwrap(); + assert_eq!(pf.partition_values.len(), 1); + assert_eq!( + pf.partition_values[0], + ScalarValue::Utf8(Some("Electronics/Computers".to_string())) + ); + } + #[test] fn test_try_into_partitioned_file_root_file_skipped() { // File in root directory (not inside any partition path) should be