-
Notifications
You must be signed in to change notification settings - Fork 12
[Issue #4] Make writng of DataFrames more flexible #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,58 +8,100 @@ namespace ParquetSharp | |
| public static class DataFrameExtensions | ||
| { | ||
| /// <summary> | ||
| /// Write a DataFrame in Parquet format | ||
| /// Writes a Dataframe in Parquet format using existing writer. | ||
| /// </summary> | ||
| /// <param name="dataFrame">The DataFrame to write</param> | ||
| /// <param name="dataFrame">DataFrame to write.</param> | ||
| /// <param name="fileWriter">Writer to use.</param> | ||
| /// <param name="rowGroupSize">Maximum number of rows per row group</param> | ||
| public static void ToParquet(this DataFrame dataFrame, ParquetFileWriter fileWriter, int rowGroupSize = 1024 * 1024) | ||
| { | ||
| long numRows = dataFrame.Rows.Count; | ||
| long offset = 0L; | ||
|
|
||
| while (offset < numRows) | ||
| { | ||
| int batchSize = (int)Math.Min(numRows - offset, rowGroupSize); | ||
| using RowGroupWriter rowGroupWriter = fileWriter.AppendRowGroup(); | ||
| foreach (DataFrameColumn dataFrameColumn in dataFrame.Columns) | ||
| { | ||
| using ColumnWriter columnWriter = rowGroupWriter.NextColumn(); | ||
| using LogicalColumnWriter logicalWriter = columnWriter.LogicalWriter(); | ||
| logicalWriter.Apply(new DataFrameWriter(dataFrameColumn, offset, batchSize)); | ||
| } | ||
|
|
||
| offset += batchSize; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Write DataFrames in Parquet format | ||
| /// </summary> | ||
| /// <param name="dataFrame">DataFrames to write</param> | ||
| /// <param name="path">Path to write to</param> | ||
| /// <param name="writerProperties">Optional writer properties that override the default properties</param> | ||
| /// <param name="logicalTypeOverrides">Mapping from column names to Parquet logical types, | ||
| /// overriding the default logical types. When writing decimal columns, a logical type must be provided | ||
| /// to specify the precision and scale to use.</param> | ||
| /// <param name="rowGroupSize">Maximum number of rows per row group</param> | ||
| public static void ToParquet( | ||
| this DataFrame dataFrame, string path, WriterProperties? writerProperties = null, | ||
| this IEnumerable<DataFrame> dataFrames, string path, WriterProperties? writerProperties = null, | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. easy overload to write multiple dataframes. They must be of the same schema. this is currenlty not enforced. |
||
| IReadOnlyDictionary<string, LogicalType>? logicalTypeOverrides = null, int rowGroupSize = 1024 * 1024) | ||
| { | ||
| var schemaColumns = dataFrame.Columns.Select(col => GetSchemaColumn( | ||
| col, logicalTypeOverrides != null && logicalTypeOverrides.TryGetValue(col.Name, out var logicalType) ? logicalType : null)).ToArray(); | ||
| using var fileWriter = writerProperties == null | ||
| ? new ParquetFileWriter(path, schemaColumns) | ||
| : new ParquetFileWriter(path, schemaColumns, writerProperties); | ||
| if (dataFrames is null) | ||
| throw new ArgumentNullException(nameof(dataFrames)); | ||
|
|
||
| var numRows = dataFrame.Rows.Count; | ||
| var offset = 0L; | ||
| DataFrame firstDataFrame = dataFrames.First(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using |
||
| using ParquetFileWriter fileWriter = GetParquetFileWriter(path, writerProperties, logicalTypeOverrides, firstDataFrame); | ||
|
|
||
| while (offset < numRows) | ||
| foreach (DataFrame dataFrame in dataFrames) | ||
| { | ||
| var batchSize = (int)Math.Min(numRows - offset, rowGroupSize); | ||
| using var rowGroupWriter = fileWriter.AppendRowGroup(); | ||
| foreach (var dataFrameColumn in dataFrame.Columns) | ||
| { | ||
| using var columnWriter = rowGroupWriter.NextColumn(); | ||
| using var logicalWriter = columnWriter.LogicalWriter(); | ||
| logicalWriter.Apply(new DataFrameWriter(dataFrameColumn, offset, batchSize)); | ||
| } | ||
|
|
||
| offset += batchSize; | ||
| dataFrame.ToParquet(fileWriter, rowGroupSize); | ||
| } | ||
|
|
||
| fileWriter.Close(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Write a DataFrame in Parquet format | ||
| /// </summary> | ||
| /// <param name="dataFrame">The DataFrame to write</param> | ||
| /// <param name="path">Path to write to</param> | ||
| /// <param name="writerProperties">Optional writer properties that override the default properties</param> | ||
| /// <param name="logicalTypeOverrides">Mapping from column names to Parquet logical types, | ||
| /// overriding the default logical types. When writing decimal columns, a logical type must be provided | ||
| /// to specify the precision and scale to use.</param> | ||
| /// <param name="rowGroupSize">Maximum number of rows per row group</param> | ||
| public static void ToParquet(this DataFrame dataFrame, string path, WriterProperties? writerProperties = null, | ||
| IReadOnlyDictionary<string, LogicalType>? logicalTypeOverrides = null, int rowGroupSize = 1024 * 1024) | ||
| { | ||
| using ParquetFileWriter fileWriter = GetParquetFileWriter(path, writerProperties, logicalTypeOverrides, dataFrame); | ||
| dataFrame.ToParquet(fileWriter, rowGroupSize); | ||
|
|
||
| fileWriter.Close(); | ||
| } | ||
|
|
||
| private static ParquetFileWriter GetParquetFileWriter( | ||
| string path, WriterProperties writerProperties, | ||
| IReadOnlyDictionary<string, LogicalType> logicalTypeOverrides, DataFrame firstDataFrame) | ||
| { | ||
| Column[] schemaColumns = firstDataFrame.Columns.Select(col => GetSchemaColumn( | ||
| col, | ||
| (logicalTypeOverrides != null && logicalTypeOverrides.TryGetValue(col.Name, out LogicalType logicalType)) | ||
| ? logicalType : null)).ToArray(); | ||
|
|
||
| return writerProperties == null | ||
| ? new ParquetFileWriter(path, schemaColumns) | ||
| : new ParquetFileWriter(path, schemaColumns, writerProperties); | ||
| } | ||
|
|
||
| private static Column GetSchemaColumn(DataFrameColumn column, LogicalType? logicalTypeOverride) | ||
| { | ||
| var dataType = column.DataType; | ||
| var nullable = column.NullCount > 0; | ||
| Type dataType = column.DataType; | ||
| if (dataType == typeof(decimal) && logicalTypeOverride == null) | ||
| { | ||
| throw new ArgumentException($"Logical type override must be specified for decimal column '{column.Name}'"); | ||
| } | ||
|
|
||
| if (nullable && dataType.IsValueType) | ||
| { | ||
| if (dataType.IsValueType && Nullable.GetUnderlyingType(dataType) != null) | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's a bug to change a schema of parquet depending on data within a particular row. If you are writing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I think it makes sense to always write value types as nullable, as DataFrame columns are always nullable. This is going to be required to support writing an This change looks wrong though. When you have a Can you also please add the surrounding curly braces back to match our code style. |
||
| dataType = typeof(Nullable<>).MakeGenericType(dataType); | ||
| } | ||
|
|
||
| return new Column(dataType, column.Name, logicalTypeOverride); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: add unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 could you also please document this feature in the README file