Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 70 additions & 28 deletions ParquetSharp.DataFrame/DataFrameExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,100 @@ namespace ParquetSharp
public static class DataFrameExtensions
{
/// <summary>
/// Write a DataFrame in Parquet format
/// Writes a Dataframe in Parquet format using existing writer.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: add unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 could you also please document this feature in the README file

/// </summary>
/// <param name="dataFrame">The DataFrame to write</param>
/// <param name="dataFrame">DataFrame to write.</param>
/// <param name="fileWriter">Writer to use.</param>
/// <param name="rowGroupSize">Maximum number of rows per row group</param>
public static void ToParquet(this DataFrame dataFrame, ParquetFileWriter fileWriter, int rowGroupSize = 1024 * 1024)
{
long numRows = dataFrame.Rows.Count;
long offset = 0L;

while (offset < numRows)
{
int batchSize = (int)Math.Min(numRows - offset, rowGroupSize);
using RowGroupWriter rowGroupWriter = fileWriter.AppendRowGroup();
foreach (DataFrameColumn dataFrameColumn in dataFrame.Columns)
{
using ColumnWriter columnWriter = rowGroupWriter.NextColumn();
using LogicalColumnWriter logicalWriter = columnWriter.LogicalWriter();
logicalWriter.Apply(new DataFrameWriter(dataFrameColumn, offset, batchSize));
}

offset += batchSize;
}
}

/// <summary>
/// Write DataFrames in Parquet format
/// </summary>
/// <param name="dataFrame">DataFrames to write</param>
/// <param name="path">Path to write to</param>
/// <param name="writerProperties">Optional writer properties that override the default properties</param>
/// <param name="logicalTypeOverrides">Mapping from column names to Parquet logical types,
/// overriding the default logical types. When writing decimal columns, a logical type must be provided
/// to specify the precision and scale to use.</param>
/// <param name="rowGroupSize">Maximum number of rows per row group</param>
public static void ToParquet(
this DataFrame dataFrame, string path, WriterProperties? writerProperties = null,
this IEnumerable<DataFrame> dataFrames, string path, WriterProperties? writerProperties = null,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

easy overload to write multiple dataframes. They must be of the same schema. this is currenlty not enforced.
TODO: ensure that schema is same in all dataframes in a sequence.

IReadOnlyDictionary<string, LogicalType>? logicalTypeOverrides = null, int rowGroupSize = 1024 * 1024)
{
var schemaColumns = dataFrame.Columns.Select(col => GetSchemaColumn(
col, logicalTypeOverrides != null && logicalTypeOverrides.TryGetValue(col.Name, out var logicalType) ? logicalType : null)).ToArray();
using var fileWriter = writerProperties == null
? new ParquetFileWriter(path, schemaColumns)
: new ParquetFileWriter(path, schemaColumns, writerProperties);
if (dataFrames is null)
throw new ArgumentNullException(nameof(dataFrames));

var numRows = dataFrame.Rows.Count;
var offset = 0L;
DataFrame firstDataFrame = dataFrames.First();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using First() then a foreach over dataFrames below requires multiple enumerations over the IEnumerable, which won't be compatible with all IEnumerable types. I think you'll probably want to create a null ParquetFileWriter and then assign it on the first iteration of the loop.

using ParquetFileWriter fileWriter = GetParquetFileWriter(path, writerProperties, logicalTypeOverrides, firstDataFrame);

while (offset < numRows)
foreach (DataFrame dataFrame in dataFrames)
{
var batchSize = (int)Math.Min(numRows - offset, rowGroupSize);
using var rowGroupWriter = fileWriter.AppendRowGroup();
foreach (var dataFrameColumn in dataFrame.Columns)
{
using var columnWriter = rowGroupWriter.NextColumn();
using var logicalWriter = columnWriter.LogicalWriter();
logicalWriter.Apply(new DataFrameWriter(dataFrameColumn, offset, batchSize));
}

offset += batchSize;
dataFrame.ToParquet(fileWriter, rowGroupSize);
}

fileWriter.Close();
}

/// <summary>
/// Write a DataFrame in Parquet format
/// </summary>
/// <param name="dataFrame">The DataFrame to write</param>
/// <param name="path">Path to write to</param>
/// <param name="writerProperties">Optional writer properties that override the default properties</param>
/// <param name="logicalTypeOverrides">Mapping from column names to Parquet logical types,
/// overriding the default logical types. When writing decimal columns, a logical type must be provided
/// to specify the precision and scale to use.</param>
/// <param name="rowGroupSize">Maximum number of rows per row group</param>
public static void ToParquet(this DataFrame dataFrame, string path, WriterProperties? writerProperties = null,
IReadOnlyDictionary<string, LogicalType>? logicalTypeOverrides = null, int rowGroupSize = 1024 * 1024)
{
using ParquetFileWriter fileWriter = GetParquetFileWriter(path, writerProperties, logicalTypeOverrides, dataFrame);
dataFrame.ToParquet(fileWriter, rowGroupSize);

fileWriter.Close();
}

private static ParquetFileWriter GetParquetFileWriter(
string path, WriterProperties writerProperties,
IReadOnlyDictionary<string, LogicalType> logicalTypeOverrides, DataFrame firstDataFrame)
{
Column[] schemaColumns = firstDataFrame.Columns.Select(col => GetSchemaColumn(
col,
(logicalTypeOverrides != null && logicalTypeOverrides.TryGetValue(col.Name, out LogicalType logicalType))
? logicalType : null)).ToArray();

return writerProperties == null
? new ParquetFileWriter(path, schemaColumns)
: new ParquetFileWriter(path, schemaColumns, writerProperties);
}

private static Column GetSchemaColumn(DataFrameColumn column, LogicalType? logicalTypeOverride)
{
var dataType = column.DataType;
var nullable = column.NullCount > 0;
Type dataType = column.DataType;
if (dataType == typeof(decimal) && logicalTypeOverride == null)
{
throw new ArgumentException($"Logical type override must be specified for decimal column '{column.Name}'");
}

if (nullable && dataType.IsValueType)
{
if (dataType.IsValueType && Nullable.GetUnderlyingType(dataType) != null)
Copy link
Author

@GKrivosheev-rms GKrivosheev-rms Feb 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bug to change a schema of parquet depending on data within a particular row. If you are writingint?it should always be a nullable column even if there are no actual null values in that specific dataframe instance.. Some readers may expect exact type.

Copy link
Contributor

@adamreeve adamreeve Feb 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think it makes sense to always write value types as nullable, as DataFrame columns are always nullable. This is going to be required to support writing an IEnumerable<DataFrame> as otherwise there could be a scenario where the first file has no nulls but a subsequent file has nulls which would cause a crash, and we can't iterate over the IEnumerable multiple times to check for nulls unless we were more restrictive and only allowed writing IReadOnlyList for example. But making the type nullable depending on the null count was something I wasn't sure about when I wrote this and this scenario is a good example of why we should just always write value types as nullable.

This change looks wrong though. When you have a DataFrameColumn of Int32 for example, column.DataType will be System.Int32, not System.Nullable<System.Int32>, even if the column contains null values. So you need to remove the Nullable.GetUnderlyingType(dataType) != null check as that will always be false. I think you'll find that this is causing some test failures, and you'll also need to update tests that expect non-nullable columns.

Can you also please add the surrounding curly braces back to match our code style.

dataType = typeof(Nullable<>).MakeGenericType(dataType);
}

return new Column(dataType, column.Name, logicalTypeOverride);
}
Expand Down