Skip to content
130 changes: 125 additions & 5 deletions paimon-python/pypaimon/tests/data_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from pypaimon import CatalogFactory, Schema
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.read.read_builder import ReadBuilder
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.offset_row import OffsetRow

Expand Down Expand Up @@ -141,13 +140,63 @@ def test_basic(self):
('f1', pa.int16()),
]))
self.assertEqual(actual_data, expect_data)
self.assertEqual(
len(actual_data.schema), len(expect_data.schema),
'Read output column count must match schema')
self.assertEqual(
actual_data.schema.names, expect_data.schema.names,
'Read output column names must match schema')

def test_partitioned_read_requested_column_missing_in_file(self):
pa_schema = pa.schema([('f0', pa.int32()), ('f1', pa.string()), ('dt', pa.string())])
schema = Schema.from_pyarrow_schema(
pa_schema,
partition_keys=['dt'],
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}
)
self.catalog.create_table('default.test_partition_missing_col', schema, False)
table = self.catalog.get_table('default.test_partition_missing_col')
wb = table.new_batch_write_builder()

# assert manifest file meta contains min and max row id
tw1 = wb.new_write()
tc1 = wb.new_commit()
tw1.write_arrow(pa.Table.from_pydict(
{'f0': [1, 2], 'f1': ['a', 'b'], 'dt': ['p1', 'p1']},
schema=pa_schema
))
tc1.commit(tw1.prepare_commit())
tw1.close()
tc1.close()

tw2 = wb.new_write().with_write_type(['f0', 'dt'])
tc2 = wb.new_commit()
# Row key extractor uses table column indices; pass table-ordered data with null for f1
tw2.write_arrow(pa.Table.from_pydict(
{'f0': [3, 4], 'f1': [None, None], 'dt': ['p1', 'p1']},
schema=pa_schema
))
tc2.commit(tw2.prepare_commit())
tw2.close()
tc2.close()

actual = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
self.assertEqual(len(actual.schema), 3, 'Must have f0, f1, dt (no silent drop when f1 missing in file)')
self.assertEqual(actual.schema.names, ['f0', 'f1', 'dt'])
self.assertEqual(actual.num_rows, 4)
f1_col = actual.column('f1')
self.assertEqual(f1_col[0].as_py(), 'a')
self.assertEqual(f1_col[1].as_py(), 'b')
self.assertIsNone(f1_col[2].as_py())
self.assertIsNone(f1_col[3].as_py())

# Assert manifest file meta contains min and max row id
manifest_list_manager = ManifestListManager(table)
snapshot_manager = SnapshotManager(table)
manifest = manifest_list_manager.read(snapshot_manager.get_latest_snapshot().delta_manifest_list)[0]
self.assertEqual(0, manifest.min_row_id)
self.assertEqual(1, manifest.max_row_id)
all_manifests = manifest_list_manager.read_all(snapshot_manager.get_latest_snapshot())
first_commit = next((m for m in all_manifests if m.min_row_id == 0 and m.max_row_id == 1), None)
self.assertIsNotNone(first_commit, "Should have a manifest with min_row_id=0, max_row_id=1")
second_commit = next((m for m in all_manifests if m.min_row_id == 2 and m.max_row_id == 3), None)
self.assertIsNotNone(second_commit, "Should have a manifest with min_row_id=2, max_row_id=3")

def test_merge_reader(self):
from pypaimon.read.reader.concat_batch_reader import MergeAllBatchReader
Expand Down Expand Up @@ -280,6 +329,14 @@ def test_with_slice(self):
[2, 1001, 2001],
"with_slice(1, 4) should return id in (2, 1001, 2001). Got ids=%s" % ids,
)
scan_oob = rb.new_scan().with_slice(10, 12)
splits_oob = scan_oob.plan().splits()
result_oob = rb.new_read().to_pandas(splits_oob)
self.assertEqual(
len(result_oob),
0,
"with_slice(10, 12) on 6 rows should return 0 rows (out of bounds), got %d" % len(result_oob),
)

# Out-of-bounds slice: 6 rows total, slice(10, 12) should return 0 rows
scan_oob = rb.new_scan().with_slice(10, 12)
Expand Down Expand Up @@ -439,6 +496,8 @@ def test_multiple_appends(self):
'f2': ['b'] * 100 + ['y'] + ['d'],
}, schema=simple_pa_schema)
self.assertEqual(actual, expect)
self.assertEqual(len(actual.schema), len(expect.schema), 'Merge read output column count must match schema')
self.assertEqual(actual.schema.names, expect.schema.names, 'Merge read output column names must match schema')

def test_disorder_cols_append(self):
simple_pa_schema = pa.schema([
Expand Down Expand Up @@ -1175,6 +1234,7 @@ def test_read_row_tracking_metadata(self):
pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
]))
self.assertEqual(actual_data, expect_data)
self.assertEqual(len(actual_data.schema), len(expect_data.schema), 'Read output column count must match schema')

# write 2
table_write = write_builder.new_write().with_write_type(['f0'])
Expand Down Expand Up @@ -1210,6 +1270,66 @@ def test_read_row_tracking_metadata(self):
pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
]))
self.assertEqual(actual_data, expect_data)
self.assertEqual(len(actual_data.schema), len(expect_data.schema), 'Read output column count must match schema')

def test_with_blob(self):
from pypaimon.table.row.blob import BlobDescriptor

pa_schema = pa.schema([
('id', pa.int32()),
('picture', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-as-descriptor': 'true',
},
)
self.catalog.create_table('default.test_with_blob', schema, False)
table = self.catalog.get_table('default.test_with_blob')

blob_path = os.path.join(self.tempdir, 'blob_ev')
with open(blob_path, 'wb') as f:
f.write(b'x')
descriptor = BlobDescriptor(blob_path, 0, 1)

wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [1], 'picture': [descriptor.serialize()]},
schema=pa_schema,
))
cmts = tw.prepare_commit()
if cmts and cmts[0].new_files:
for nf in cmts[0].new_files:
nf.first_row_id = 0
tc.commit(cmts)
tw.close()
tc.close()

tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [2], 'picture': [descriptor.serialize()]},
schema=pa_schema,
))
cmts = tw.prepare_commit()
if cmts and cmts[0].new_files:
for nf in cmts[0].new_files:
nf.first_row_id = 1
tc.commit(cmts)
tw.close()
tc.close()

rb = table.new_read_builder()
rb.with_projection(['id', '_ROW_ID', 'picture', '_SEQUENCE_NUMBER'])
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 2)
self.assertEqual(actual.column('id').to_pylist(), [1, 2])
self.assertEqual(actual.column('_ROW_ID').to_pylist(), [0, 1])

def test_from_arrays_without_schema(self):
schema = pa.schema([
Expand Down
Loading
Loading