diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4ec7a73afe..992b3c1db0 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1676,22 +1676,18 @@ def _task_to_record_batches( # Create the mask of indices that we're interested in indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) current_batch = current_batch.take(indices) + if pyarrow_filter is not None: + try: + current_batch = current_batch.filter(pyarrow_filter) + except IndexError: + # PyArrow < 21 raises IndexError when filter produces zero rows + # (fixed in https://github.com/apache/arrow/pull/46057) + current_batch = current_batch.slice(0, 0) # skip empty batches if current_batch.num_rows == 0: continue - # Apply the user filter - if pyarrow_filter is not None: - # Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 ) - table = pa.Table.from_batches([current_batch]) - table = table.filter(pyarrow_filter) - # skip empty batches - if table.num_rows == 0: - continue - - current_batch = table.combine_chunks().to_batches()[0] - yield _to_requested_schema( projected_schema, file_project_schema, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2f36661a1f..b1bd622883 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -3267,6 +3267,130 @@ def _expected_batch(unit: str) -> pa.RecordBatch: assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result) +def test_task_to_record_batches_scanner_filter_not_set_with_positional_deletes(tmpdir: str) -> None: + """Regression test for https://github.com/apache/iceberg-python/issues/3272. + + When positional deletes are present the scanner must NOT receive the row filter as a + push-down predicate. Positional-delete indices reference absolute row positions in the + original file; if the scanner filters rows first the surviving rows shift and the + indices no longer map correctly, producing silently wrong results. + + The test chooses data where the old (buggy) code path gives a distinct wrong answer: + - File rows (positions 0-3): [1, 2, 3, 4] + - Positional delete: position 2 → removes value 3 → survivors [1, 2, 4] + - Row filter: id > 2 → expected result [4] + + Old bug (scanner pre-filters id > 2 → [3, 4], then _combine_positional_deletes sees only + 2 rows so absolute position 2 is outside the batch range and nothing is deleted → [3, 4]). + """ + from pyiceberg.expressions.visitors import bind + + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + # File row positions: 0→1, 1→2, 2→3, 3→4 + arrow_table = pa.table([pa.array([1, 2, 3, 4], type=pa.int32())], schema=arrow_schema) + data_file = _write_table_to_data_file( + f"{tmpdir}/test_scanner_filter_not_set_with_pos_deletes.parquet", arrow_schema, arrow_table + ) + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + + positional_deletes = [pa.chunked_array([pa.array([2], type=pa.int64())])] + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=positional_deletes, + case_sensitive=True, + ) + ) + + assert len(result_batches) == 1 + assert result_batches[0].column(0).to_pylist() == [4] + + +def test_task_to_record_batches_filter_applied_after_positional_deletes(tmpdir: str) -> None: + """Regression test: the row filter must be applied *after* positional deletes are removed. + + When positional deletes are present the scanner does not push down the predicate, so + ``_task_to_record_batches`` must apply ``pyarrow_filter`` explicitly after ``take``. + This test uses data where the expected result differs from both + "filter only" and "deletes only" projections, ensuring that skipping either step + would produce the wrong answer. + """ + from pyiceberg.expressions.visitors import bind + + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + # File rows (0-indexed positions): 0→1, 1→2, 2→3, 3→4, 4→5 + arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema) + data_file = _write_table_to_data_file( + f"{tmpdir}/test_task_to_record_batches_filter_with_positional.parquet", arrow_schema, arrow_table + ) + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + + # Delete file-positions 1 and 3 (values 2 and 4); survivors: [1, 3, 5] + # Then apply filter id >= 3; expected result: [3, 5] + # + # Wrong results that would indicate a bug: + # [1, 3, 5] — filter not applied after deletes + # [3, 4, 5] — positional deletes not applied (scanner skips filter push-down) + positional_deletes = [pa.chunked_array([pa.array([1, 3], type=pa.int64())])] + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=positional_deletes, + case_sensitive=True, + ) + ) + + assert len(result_batches) == 1 + assert result_batches[0].column(0).to_pylist() == [3, 5] + + +def test_task_to_record_batches_filter_after_positional_deletes_empty_result(tmpdir: str) -> None: + """Regression: filter after positional deletes must not raise even when the result is empty. + + PyArrow < 21 raises IndexError from RecordBatch.filter(Expression) when the result has + zero rows (fixed in https://github.com/apache/arrow/pull/46057). This test ensures the + positional-delete path handles that case gracefully and yields no batches. + """ + from pyiceberg.expressions.visitors import bind + + arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),)) + arrow_table = pa.table([pa.array([1, 2, 3], type=pa.int32())], schema=arrow_schema) + data_file = _write_table_to_data_file( + f"{tmpdir}/test_filter_after_positional_deletes_empty_result.parquet", arrow_schema, arrow_table + ) + + table_schema = Schema(NestedField(1, "id", IntegerType(), required=False)) + + # No rows deleted, but filter (id > 10) eliminates all rows → must return empty + positional_deletes = [pa.chunked_array([pa.array([], type=pa.int64())])] + result_batches = list( + _task_to_record_batches( + PyArrowFileIO(), + FileScanTask(data_file), + bound_row_filter=bind(table_schema, GreaterThan("id", 10), case_sensitive=True), + projected_schema=table_schema, + table_schema=table_schema, + projected_field_ids={1}, + positional_deletes=positional_deletes, + case_sensitive=True, + ) + ) + + assert result_batches == [] + + def test_parse_location_defaults() -> None: """Test that parse_location uses defaults."""