Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1676,22 +1676,13 @@ def _task_to_record_batches(
# Create the mask of indices that we're interested in
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch))
current_batch = current_batch.take(indices)
if pyarrow_filter is not None:
current_batch = current_batch.filter(pyarrow_filter)

# skip empty batches
if current_batch.num_rows == 0:
continue

# Apply the user filter
if pyarrow_filter is not None:
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
table = pa.Table.from_batches([current_batch])
table = table.filter(pyarrow_filter)
# skip empty batches
if table.num_rows == 0:
continue

current_batch = table.combine_chunks().to_batches()[0]

yield _to_requested_schema(
projected_schema,
file_project_schema,
Expand Down
89 changes: 89 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3267,6 +3267,95 @@ def _expected_batch(unit: str) -> pa.RecordBatch:
assert _expected_batch("ns" if format_version > 2 else "us").equals(actual_result)


def test_task_to_record_batches_scanner_filter_not_set_with_positional_deletes(tmpdir: str) -> None:
"""Regression test for https://github.com/apache/iceberg-python/issues/3272.

When positional deletes are present the scanner must NOT receive the row filter as a
push-down predicate. Positional-delete indices reference absolute row positions in the
original file; if the scanner filters rows first the surviving rows shift and the
indices no longer map correctly, producing silently wrong results.

The test chooses data where the old (buggy) code path gives a distinct wrong answer:
- File rows (positions 0-3): [1, 2, 3, 4]
- Positional delete: position 2 → removes value 3 → survivors [1, 2, 4]
- Row filter: id > 2 → expected result [4]

Old bug (scanner pre-filters id > 2 → [3, 4], then _combine_positional_deletes sees only
2 rows so absolute position 2 is outside the batch range and nothing is deleted → [3, 4]).
"""
from pyiceberg.expressions.visitors import bind

arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),))
# File row positions: 0→1, 1→2, 2→3, 3→4
arrow_table = pa.table([pa.array([1, 2, 3, 4], type=pa.int32())], schema=arrow_schema)
data_file = _write_table_to_data_file(
f"{tmpdir}/test_scanner_filter_not_set_with_pos_deletes.parquet", arrow_schema, arrow_table
)

table_schema = Schema(NestedField(1, "id", IntegerType(), required=False))

positional_deletes = [pa.chunked_array([pa.array([2], type=pa.int64())])]
result_batches = list(
_task_to_record_batches(
PyArrowFileIO(),
FileScanTask(data_file),
bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True),
projected_schema=table_schema,
table_schema=table_schema,
projected_field_ids={1},
positional_deletes=positional_deletes,
case_sensitive=True,
)
)

assert len(result_batches) == 1
assert result_batches[0].column(0).to_pylist() == [4]


def test_task_to_record_batches_filter_applied_after_positional_deletes(tmpdir: str) -> None:
"""Regression test: the row filter must be applied *after* positional deletes are removed.

When positional deletes are present the scanner does not push down the predicate, so
``_task_to_record_batches`` must apply ``pyarrow_filter`` explicitly after ``take``.
This test uses data where the expected result differs from both
"filter only" and "deletes only" projections, ensuring that skipping either step
would produce the wrong answer.
"""
from pyiceberg.expressions.visitors import bind

arrow_schema = pa.schema((pa.field("id", pa.int32(), nullable=True, metadata={PYARROW_PARQUET_FIELD_ID_KEY: "1"}),))
# File rows (0-indexed positions): 0→1, 1→2, 2→3, 3→4, 4→5
arrow_table = pa.table([pa.array([1, 2, 3, 4, 5], type=pa.int32())], schema=arrow_schema)
data_file = _write_table_to_data_file(
f"{tmpdir}/test_task_to_record_batches_filter_with_positional.parquet", arrow_schema, arrow_table
)

table_schema = Schema(NestedField(1, "id", IntegerType(), required=False))

# Delete file-positions 1 and 3 (values 2 and 4); survivors: [1, 3, 5]
# Then apply filter id >= 3; expected result: [3, 5]
#
# Wrong results that would indicate a bug:
# [1, 3, 5] — filter not applied after deletes
# [3, 4, 5] — positional deletes not applied (scanner skips filter push-down)
positional_deletes = [pa.chunked_array([pa.array([1, 3], type=pa.int64())])]
result_batches = list(
_task_to_record_batches(
PyArrowFileIO(),
FileScanTask(data_file),
bound_row_filter=bind(table_schema, GreaterThan("id", 2), case_sensitive=True),
projected_schema=table_schema,
table_schema=table_schema,
projected_field_ids={1},
positional_deletes=positional_deletes,
case_sensitive=True,
)
)

assert len(result_batches) == 1
assert result_batches[0].column(0).to_pylist() == [3, 5]


def test_parse_location_defaults() -> None:
"""Test that parse_location uses defaults."""

Expand Down