diff --git a/iotdb-client/client-py/iotdb/Session.py b/iotdb-client/client-py/iotdb/Session.py index a3d89d717464b..eb9d84e05b05a 100644 --- a/iotdb-client/client-py/iotdb/Session.py +++ b/iotdb-client/client-py/iotdb/Session.py @@ -1680,6 +1680,14 @@ def value_to_bytes(data_types, values): values_tobe_packed.append(b"\x0b") values_tobe_packed.append(len(value_bytes)) values_tobe_packed.append(value_bytes) + # OBJECT (binary payload, same layout as BLOB) + elif data_type == 12: + format_str_list.append("ci") + format_str_list.append(str(len(value))) + format_str_list.append("s") + values_tobe_packed.append(b"\x0c") + values_tobe_packed.append(len(value)) + values_tobe_packed.append(value) else: raise RuntimeError("Unsupported data type:" + str(data_type)) format_str = "".join(format_str_list) diff --git a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py index cc5577e885c62..1e124c49d7a36 100644 --- a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py +++ b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py @@ -73,7 +73,7 @@ def read_from_buffer(buffer, size): def read_column_types(buffer, value_column_count): data_types = np.frombuffer(buffer, dtype=np.uint8, count=value_column_count) new_buffer = buffer[value_column_count:] - if not np.all(np.isin(data_types, (0, 1, 2, 3, 4, 5, 8, 9, 10, 11))): + if not np.all(np.isin(data_types, (0, 1, 2, 3, 4, 5, 8, 9, 10, 11, 12))): raise Exception("Invalid data type encountered: " + str(data_types)) return data_types, new_buffer @@ -190,8 +190,9 @@ def deserialize_from_boolean_array(buffer, size): def read_binary_column(buffer, data_type, position_count): - if data_type != 5: - raise Exception("Invalid data type: " + data_type) + # TEXT, BLOB, STRING, OBJECT: length-prefixed binary values + if data_type not in (5, 10, 11, 12): + raise Exception("Invalid data type: " + str(data_type)) null_indicators, buffer = deserialize_null_indicators(buffer, position_count) if null_indicators is None: @@ -229,7 +230,7 @@ def read_run_length_column(buffer, data_type, position_count): def repeat(column, data_type, position_count): - if data_type in (0, 5): + if data_type in (0, 5, 10, 11, 12): if column.size == 1: return np.full( position_count, column[0], dtype=(bool if data_type == 0 else object) diff --git a/iotdb-client/client-py/iotdb/utils/Field.py b/iotdb-client/client-py/iotdb/utils/Field.py index d9a0ee77776ec..01d2e5f6c7581 100644 --- a/iotdb-client/client-py/iotdb/utils/Field.py +++ b/iotdb-client/client-py/iotdb/utils/Field.py @@ -58,6 +58,7 @@ def copy(field): output.get_data_type() == TSDataType.TEXT or output.get_data_type() == TSDataType.STRING or output.get_data_type() == TSDataType.BLOB + or output.get_data_type() == TSDataType.OBJECT ): output.set_binary_value(field.get_binary_value()) else: @@ -154,6 +155,7 @@ def get_binary_value(self): self.__data_type != TSDataType.TEXT and self.__data_type != TSDataType.STRING and self.__data_type != TSDataType.BLOB + and self.__data_type != TSDataType.OBJECT or self.value is None or self.value is pd.NA ): @@ -188,8 +190,8 @@ def get_string_value(self): # TEXT, STRING if self.__data_type == 5 or self.__data_type == 11: return self.value.decode("utf-8") - # BLOB - elif self.__data_type == 10: + # BLOB, OBJECT (raw bytes; display as hex for readability) + elif self.__data_type == 10 or self.__data_type == 12: return str(hex(int.from_bytes(self.value, byteorder="big"))) # TIMESTAMP elif self.__data_type == 8: @@ -226,7 +228,7 @@ def get_object_value(self, data_type): return parse_int_to_date(self.value) elif data_type == 5 or data_type == 11: return self.value.decode("utf-8") - elif data_type == 10: + elif data_type == 10 or data_type == 12: return self.value else: raise RuntimeError("Unsupported data type:" + str(data_type)) diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py index 4b9082b5353fd..50bacdc1258ef 100644 --- a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py +++ b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py @@ -32,6 +32,7 @@ class TSDataType(IntEnum): DATE = 9 BLOB = 10 STRING = 11 + OBJECT = 12 def np_dtype(self): return { @@ -45,6 +46,7 @@ def np_dtype(self): TSDataType.DATE: date, TSDataType.BLOB: bytes, TSDataType.STRING: str, + TSDataType.OBJECT: bytes, }[self] diff --git a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py index 9a0860d3a43e9..0fe5c8711aa47 100644 --- a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py +++ b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py @@ -78,8 +78,13 @@ def __init__( if timestamps.dtype != TSDataType.INT64.np_dtype(): timestamps = timestamps.astype(TSDataType.INT64.np_dtype()) for i in range(len(values)): - if values[i].dtype != data_types[i].np_dtype(): - values[i] = values[i].astype(data_types[i].np_dtype()) + dt = data_types[i] + if dt in (TSDataType.BLOB, TSDataType.OBJECT) and values[ + i + ].dtype == np.dtype(object): + continue + if values[i].dtype != dt.np_dtype(): + values[i] = values[i].astype(dt.np_dtype()) self.__values = values self.__timestamps = timestamps @@ -141,8 +146,8 @@ def get_binary_values(self): or data_type == 8 ): bs = value.tobytes() - # TEXT, STRING, BLOB - elif data_type == 5 or data_type == 11 or data_type == 10: + # TEXT, STRING, BLOB, OBJECT + elif data_type in (5, 10, 11, 12): format_str_list = [">"] values_tobe_packed = [] for str_list in value: diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index 9c5b3ec06ddc5..5dd0de6cf9743 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -189,6 +189,7 @@ def get_typed_point(field: Field, none_value=None): TSDataType.STRING: lambda f: f.get_string_value(), TSDataType.DATE: lambda f: f.get_date_value(), TSDataType.BLOB: lambda f: f.get_binary_value(), + TSDataType.OBJECT: lambda f: f.get_binary_value(), } result_next_type: TSDataType = field.get_data_type() diff --git a/iotdb-client/client-py/iotdb/utils/Tablet.py b/iotdb-client/client-py/iotdb/utils/Tablet.py index 9b241723fe5b7..641a67d3738e7 100644 --- a/iotdb-client/client-py/iotdb/utils/Tablet.py +++ b/iotdb-client/client-py/iotdb/utils/Tablet.py @@ -189,8 +189,8 @@ def get_binary_values(self): values_tobe_packed.append(0) self.__mark_none_value(bitmaps, i, j) has_none = True - # TEXT, STRING, BLOB - elif data_type == 5 or data_type == 11 or data_type == 10: + # TEXT, STRING, BLOB, OBJECT + elif data_type in (5, 10, 11, 12): for j in range(self.__row_number): if self.__values[j][i] is not None: if isinstance(self.__values[j][i], str): diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index 0edc76f68fd6d..ed73a8f5f4d3d 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -35,6 +35,15 @@ TIMESTAMP_STR = "Time" +def _column_type_to_tsdata_type(raw): + """Resolve Thrift column type (name, ordinal int, or TSDataType) to TSDataType.""" + if isinstance(raw, TSDataType): + return raw + if isinstance(raw, int): + return TSDataType(raw) + return TSDataType[raw] + + class IoTDBRpcDataSet(object): def __init__( @@ -96,7 +105,7 @@ def __init__( self.__data_type_for_tsblock_column = [None] * ts_block_column_size for i in range(len(column_name_list)): name = column_name_list[i] - column_type = TSDataType[column_type_list[i]] + column_type = _column_type_to_tsdata_type(column_type_list[i]) self.__column_name_list.append(name) self.__column_type_list.append(column_type) tsblock_column_index = column_index_2_tsblock_column_index_list[ @@ -332,8 +341,8 @@ def _process_buffer(self): continue data_type = self.__data_type_for_tsblock_column[location] column_array = column_arrays[location] - # BOOLEAN, INT32, INT64, FLOAT, DOUBLE, BLOB - if data_type in (0, 1, 2, 3, 4, 10): + # BOOLEAN, INT32, INT64, FLOAT, DOUBLE, BLOB, OBJECT + if data_type in (0, 1, 2, 3, 4, 10, 12): data_array = column_array if ( data_type != 10 @@ -376,11 +385,12 @@ def _process_buffer(self): tmp_array = np.full( array_length, np.nan, dtype=data_type.np_dtype() ) - # TEXT, STRING, BLOB, DATE, TIMESTAMP + # TEXT, STRING, BLOB, OBJECT, DATE, TIMESTAMP elif ( data_type == 5 or data_type == 11 or data_type == 10 + or data_type == 12 or data_type == 9 or data_type == 8 ): diff --git a/iotdb-client/client-py/tests/integration/test_object_type.py b/iotdb-client/client-py/tests/integration/test_object_type.py new file mode 100644 index 0000000000000..ce2552246cc22 --- /dev/null +++ b/iotdb-client/client-py/tests/integration/test_object_type.py @@ -0,0 +1,100 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import numpy as np + +from iotdb.Session import Session +from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor +from iotdb.utils.NumpyTablet import NumpyTablet +from iotdb.utils.Tablet import Tablet +from .iotdb_container import IoTDBContainer + + +def test_object_type_tablet_read_write(): + with IoTDBContainer() as db: + session = Session(db.get_container_host_ip(), db.get_exposed_port(6667)) + session.open(False) + assert session.is_open() + + storage = "root.sg_object_py" + device = f"{storage}.d1" + measurement = "obj_m1" + path = f"{device}.{measurement}" + + session.set_storage_group(storage) + session.create_time_series( + path, + TSDataType.OBJECT, + TSEncoding.PLAIN, + Compressor.SNAPPY, + ) + + payloads = [b"\x01\x02\x03", b"payload-b", b""] + timestamps = [100, 200, 300] + values = [[p] for p in payloads] + + tablet = Tablet( + device, + [measurement], + [TSDataType.OBJECT], + values, + timestamps, + ) + session.insert_tablet(tablet) + + np_tablet = NumpyTablet( + device, + [measurement], + [TSDataType.OBJECT], + [np.array([b"\xaa\xbb", b"\xcc"], dtype=object)], + np.array([400, 500], dtype=TSDataType.INT64.np_dtype()), + ) + session.insert_tablet(np_tablet) + + session.insert_records( + [device, device], + [600, 700], + [[measurement], [measurement]], + [[TSDataType.OBJECT], [TSDataType.OBJECT]], + [[b"\xde\xad"], [b"\xbe\xef"]], + ) + + with session.execute_query_statement( + f"select {measurement} from {device}" + ) as dataset: + rows = [] + while dataset.has_next(): + rows.append(dataset.next()) + assert len(rows) == 7 + for i, row in enumerate(rows): + f = row.get_fields()[0] + assert f.get_data_type() == TSDataType.OBJECT + assert ( + f.get_binary_value() + == [ + payloads[0], + payloads[1], + payloads[2], + b"\xaa\xbb", + b"\xcc", + b"\xde\xad", + b"\xbe\xef", + ][i] + ) + assert f.get_object_value(TSDataType.OBJECT) == f.get_binary_value() + + session.close() diff --git a/iotdb-client/client-py/tests/unit/test_numpy_tablet.py b/iotdb-client/client-py/tests/unit/test_numpy_tablet.py index 217df22ea2d3d..2c777ff3421ac 100644 --- a/iotdb-client/client-py/tests/unit/test_numpy_tablet.py +++ b/iotdb-client/client-py/tests/unit/test_numpy_tablet.py @@ -61,6 +61,27 @@ def test_numpy_tablet_serialization(): assert tablet_.get_binary_values() == np_tablet_.get_binary_values() +def test_object_column_tablet_matches_numpy_tablet(): + measurements_ = ["obj"] + data_types_ = [TSDataType.OBJECT] + values_ = [[b"\x01\x02"], [b"ab"], [b""]] + timestamps_ = [1, 2, 3] + tablet_ = Tablet( + "root.sg_test_object.d1", measurements_, data_types_, values_, timestamps_ + ) + np_values_ = [np.array([b"\x01\x02", b"ab", b""], dtype=object)] + np_timestamps_ = np.array([1, 2, 3], dtype=">i8") + np_tablet_ = NumpyTablet( + "root.sg_test_object.d1", + measurements_, + data_types_, + np_values_, + np_timestamps_, + ) + assert tablet_.get_binary_timestamps() == np_tablet_.get_binary_timestamps() + assert tablet_.get_binary_values() == np_tablet_.get_binary_values() + + def test_numpy_tablet_with_none_serialization(): measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]