Skip to content

Commit f24cf39

Browse files
[python] fix rest file io and blob-as-descriptor file io token merge issue and add rest catalog blob-as-descriptor sample (#7009)
1 parent 7b62ef6 commit f24cf39

5 files changed

Lines changed: 234 additions & 113 deletions

File tree

paimon-python/pypaimon/api/rest_util.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,24 @@ def extract_prefix_map(
4343
new_key = key[len(prefix):]
4444
result[new_key] = str(value)
4545
return result
46+
47+
@staticmethod
48+
def merge(
49+
base_properties: Dict[str, str],
50+
override_properties: Dict[str, str]) -> Dict[str, str]:
51+
if override_properties is None:
52+
override_properties = {}
53+
if base_properties is None:
54+
base_properties = {}
55+
56+
result = {}
57+
58+
for key, value in base_properties.items():
59+
if value is not None and key not in override_properties:
60+
result[key] = value
61+
62+
for key, value in override_properties.items():
63+
if value is not None:
64+
result[key] = value
65+
66+
return result

paimon-python/pypaimon/catalog/rest/rest_token_file_io.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from pyarrow._fs import FileSystem
2424

2525
from pypaimon.api.rest_api import RESTApi
26+
from pypaimon.api.rest_util import RESTUtil
2627
from pypaimon.catalog.rest.rest_token import RESTToken
2728
from pypaimon.common.file_io import FileIO
2829
from pypaimon.common.identifier import Identifier
@@ -60,8 +61,17 @@ def __setstate__(self, state):
6061
def _initialize_oss_fs(self, path) -> FileSystem:
6162
self.try_to_refresh_token()
6263
merged_token = self._merge_token_with_catalog_options(self.token.token)
63-
self.properties.data.update(merged_token)
64-
return super()._initialize_oss_fs(path)
64+
merged_properties = RESTUtil.merge(
65+
self.properties.to_map() if self.properties else {},
66+
merged_token
67+
)
68+
merged_options = Options(merged_properties)
69+
original_properties = self.properties
70+
self.properties = merged_options
71+
try:
72+
return super()._initialize_oss_fs(path)
73+
finally:
74+
self.properties = original_properties
6575

6676
def _merge_token_with_catalog_options(self, token: dict) -> dict:
6777
"""Merge token with catalog options, DLF OSS endpoint should override the standard OSS endpoint."""

paimon-python/pypaimon/sample/oss_blob_as_descriptor.py

Lines changed: 0 additions & 111 deletions
This file was deleted.
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
"""
19+
Sample demonstrating how to use blob-as-descriptor mode with REST catalog.
20+
"""
21+
from pypaimon import CatalogFactory
22+
import pyarrow as pa
23+
24+
from pypaimon import Schema
25+
from pypaimon.table.row.blob import BlobDescriptor, Blob
26+
from pypaimon.common.file_io import FileIO
27+
from pypaimon.common.options import Options
28+
29+
30+
def write_table_with_blob(catalog, video_file_path: str, external_oss_options: dict):
31+
database_name = 'blob_demo'
32+
table_name = 'test_table_blob_' + str(int(__import__('time').time()))
33+
34+
catalog.create_database(
35+
name=database_name,
36+
ignore_if_exists=True,
37+
)
38+
39+
pa_schema = pa.schema([
40+
('text', pa.string()),
41+
('names', pa.list_(pa.string())),
42+
('video', pa.large_binary()) # Blob column
43+
])
44+
45+
schema = Schema.from_pyarrow_schema(
46+
pa_schema=pa_schema,
47+
partition_keys=None,
48+
primary_keys=None,
49+
options={
50+
'row-tracking.enabled': 'true',
51+
'data-evolution.enabled': 'true',
52+
'blob-field': 'video',
53+
'blob-as-descriptor': 'true'
54+
},
55+
comment='Table with blob column using blob-as-descriptor mode')
56+
57+
table_identifier = f'{database_name}.{table_name}'
58+
catalog.create_table(
59+
identifier=table_identifier,
60+
schema=schema,
61+
ignore_if_exists=True
62+
)
63+
64+
table = catalog.get_table(table_identifier)
65+
print(f"✓ Table created: {table_identifier}")
66+
67+
# Access external OSS file to get file size
68+
try:
69+
external_file_io = FileIO(video_file_path, Options(external_oss_options))
70+
video_file_size = external_file_io.get_file_size(video_file_path)
71+
except Exception as e:
72+
raise FileNotFoundError(
73+
f"Failed to access external OSS file: {video_file_path}\n"
74+
f"Error: {e}\n"
75+
f"Please check your external_oss_options credentials."
76+
) from e
77+
78+
# Create BlobDescriptor
79+
blob_descriptor = BlobDescriptor(video_file_path, 0, video_file_size)
80+
descriptor_bytes = blob_descriptor.serialize()
81+
write_builder = table.new_batch_write_builder()
82+
table_write = write_builder.new_write()
83+
table_commit = write_builder.new_commit()
84+
85+
table_write.write_arrow(pa.Table.from_pydict({
86+
'text': ['Sample video'],
87+
'names': [['video1.mp4']],
88+
'video': [descriptor_bytes]
89+
}, schema=pa_schema))
90+
91+
table_commit.commit(table_write.prepare_commit())
92+
print("✓ Data committed successfully")
93+
table_write.close()
94+
table_commit.close()
95+
96+
return f'{database_name}.{table_name}'
97+
98+
99+
def read_table_with_blob(catalog, table_name: str):
100+
table = catalog.get_table(table_name)
101+
102+
read_builder = table.new_read_builder()
103+
table_scan = read_builder.new_scan()
104+
splits = table_scan.plan().splits()
105+
table_read = read_builder.new_read()
106+
107+
result = table_read.to_arrow(splits)
108+
print(f"✓ Read {result.num_rows} rows")
109+
110+
video_bytes_list = result.column('video').to_pylist()
111+
for video_bytes in video_bytes_list:
112+
if video_bytes is None:
113+
continue
114+
blob_descriptor = BlobDescriptor.deserialize(video_bytes)
115+
from pypaimon.common.uri_reader import FileUriReader
116+
uri_reader = FileUriReader(table.file_io)
117+
blob = Blob.from_descriptor(uri_reader, blob_descriptor)
118+
blob_data = blob.to_data()
119+
print(f"✓ Blob data verified: {len(blob_data) / 1024 / 1024:.2f} MB")
120+
break
121+
122+
return result
123+
124+
125+
if __name__ == '__main__':
126+
external_oss_options = {
127+
'fs.oss.accessKeyId': "YOUR_EXTERNAL_OSS_ACCESS_KEY_ID",
128+
'fs.oss.accessKeySecret': "YOUR_EXTERNAL_OSS_ACCESS_KEY_SECRET",
129+
'fs.oss.endpoint': "oss-cn-hangzhou.aliyuncs.com",
130+
'fs.oss.region': "cn-hangzhou",
131+
}
132+
133+
video_file_path = "oss://your-bucket/blob_test/video.mov"
134+
135+
catalog_options = {
136+
'metastore': 'rest',
137+
'uri': "http://your-rest-catalog-uri",
138+
'warehouse': "your_warehouse",
139+
'dlf.region': 'cn-hangzhou',
140+
"token.provider": "dlf",
141+
'dlf.access-key-id': "YOUR_DLF_ACCESS_KEY_ID",
142+
'dlf.access-key-secret': "YOUR_DLF_ACCESS_KEY_SECRET",
143+
'dlf.oss-endpoint': "oss-cn-hangzhou.aliyuncs.com",
144+
**external_oss_options
145+
}
146+
147+
catalog = CatalogFactory.create(catalog_options)
148+
149+
try:
150+
table_name = write_table_with_blob(catalog, video_file_path, external_oss_options)
151+
result = read_table_with_blob(catalog, table_name)
152+
print("✓ Test completed successfully!")
153+
except Exception as e:
154+
print(f'✗ Error: {e}')
155+
raise

paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import os
1919
import pickle
2020
import tempfile
21+
import time
2122
import unittest
2223
from unittest.mock import patch
2324

@@ -192,6 +193,51 @@ def test_dlf_oss_endpoint_overrides_token_endpoint(self):
192193
"Other token properties should be preserved"
193194
)
194195

196+
def test_catalog_options_not_modified(self):
197+
from pypaimon.api.rest_util import RESTUtil
198+
from pypaimon.catalog.rest.rest_token import RESTToken
199+
from pyarrow.fs import LocalFileSystem
200+
201+
original_catalog_options = Options({
202+
CatalogOptions.URI.key(): "http://test-uri",
203+
"custom.key": "custom.value"
204+
})
205+
206+
catalog_options_copy = Options(original_catalog_options.to_map())
207+
208+
with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
209+
file_io = RESTTokenFileIO(
210+
self.identifier,
211+
self.warehouse_path,
212+
original_catalog_options
213+
)
214+
215+
token_dict = {
216+
OssOptions.OSS_ACCESS_KEY_ID.key(): "token-access-key",
217+
OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key",
218+
OssOptions.OSS_ENDPOINT.key(): "token-endpoint"
219+
}
220+
file_io.token = RESTToken(token_dict, int(time.time() * 1000) + 3600000)
221+
222+
with patch.object(FileIO, '_initialize_oss_fs', return_value=LocalFileSystem()):
223+
file_io._initialize_oss_fs("file:///test/path")
224+
225+
self.assertEqual(
226+
original_catalog_options.to_map(),
227+
catalog_options_copy.to_map(),
228+
"Original catalog_options should not be modified"
229+
)
230+
231+
merged_properties = RESTUtil.merge(
232+
original_catalog_options.to_map(),
233+
file_io._merge_token_with_catalog_options(token_dict)
234+
)
235+
236+
self.assertIn("custom.key", merged_properties)
237+
self.assertEqual(merged_properties["custom.key"], "custom.value")
238+
self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(), merged_properties)
239+
self.assertEqual(merged_properties[OssOptions.OSS_ACCESS_KEY_ID.key()], "token-access-key")
240+
195241

196242
if __name__ == '__main__':
197243
unittest.main()

0 commit comments

Comments
 (0)