From dc5174f960942450ad595e67fa9f95f738687763 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 06:04:33 +0200 Subject: [PATCH 01/13] Add b2o carrier support for persisted C2Array objects --- src/blosc2/__init__.py | 1 + .../{_msgpack_utils.py => b2objects.py} | 163 ++++++++---------- src/blosc2/batch_store.py | 2 +- src/blosc2/c2array.py | 32 ++++ src/blosc2/core.py | 6 +- src/blosc2/msgpack_utils.py | 92 ++++++++++ src/blosc2/schunk.py | 5 +- src/blosc2/vlarray.py | 2 +- tests/test_b2objects.py | 83 +++++++++ tests/test_batch_store.py | 2 +- 10 files changed, 289 insertions(+), 99 deletions(-) rename src/blosc2/{_msgpack_utils.py => b2objects.py} (52%) create mode 100644 src/blosc2/msgpack_utils.py create mode 100644 tests/test_b2objects.py diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index 3eb96ce4..f8044258 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -538,6 +538,7 @@ def _raise(exc): from .batch_store import Batch, BatchStore from .vlarray import VLArray, vlarray_from_cframe from .ref import Ref +from .b2objects import _open_b2object from .c2array import c2context, C2Array, URLPath diff --git a/src/blosc2/_msgpack_utils.py b/src/blosc2/b2objects.py similarity index 52% rename from src/blosc2/_msgpack_utils.py rename to src/blosc2/b2objects.py index 9cdc8df4..72e29ca2 100644 --- a/src/blosc2/_msgpack_utils.py +++ b/src/blosc2/b2objects.py @@ -12,50 +12,57 @@ import linecache import textwrap from dataclasses import asdict +from typing import Any import numpy as np -from msgpack import ExtType, packb, unpackb -from blosc2 import blosc2_ext +import blosc2 from blosc2.dsl_kernel import DSLKernel -from blosc2.ref import Ref -# Msgpack extension type codes are application-defined. Reserve code 42 in -# python-blosc2 for values serialized as Blosc2 CFrames via ``to_cframe()`` and -# reconstructed with ``blosc2.from_cframe()``. Keep this stable for backward -# compatibility with persisted msgpack payloads produced by this package. -_BLOSC2_EXT_CODE = 42 -# Reserve code 43 for structured Blosc2 reference objects that are not naturally -# serialized as CFrames. The payload is a msgpack-encoded mapping with a -# stable ``kind`` and ``version`` envelope. -_BLOSC2_STRUCTURED_EXT_CODE = 43 -_BLOSC2_STRUCTURED_VERSION = 1 -_BLOSC2_DSL_VERSION = 1 +_B2OBJECT_META_KEY = "b2o" +_B2OBJECT_VERSION = 1 +_B2OBJECT_DSL_VERSION = 1 + + +def _make_b2object_carrier( + kind: str, + shape, + dtype, + *, + chunks=None, + blocks=None, + **kwargs, +): + meta = dict(kwargs.pop("meta", {})) + meta[_B2OBJECT_META_KEY] = {"kind": kind, "version": _B2OBJECT_VERSION} + kwargs["meta"] = meta + return blosc2.empty(shape=shape, dtype=dtype, chunks=chunks, blocks=blocks, **kwargs) + + +def _write_b2object_payload(array, payload: dict[str, Any]) -> None: + array.schunk.vlmeta[_B2OBJECT_META_KEY] = payload def _encode_operand_reference(obj): - return Ref.from_object(obj).to_dict() + return blosc2.Ref.from_object(obj).to_dict() + +def _decode_operand_reference(payload): + return blosc2.Ref.from_dict(payload).open() -def _encode_structured_reference(obj): - import blosc2 - if isinstance(obj, blosc2.Ref): - payload = {"kind": "ref", "version": _BLOSC2_STRUCTURED_VERSION, "ref": obj.to_dict()} - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) +def _encode_b2object_payload(obj) -> dict[str, Any] | None: if isinstance(obj, blosc2.C2Array): - payload = _encode_operand_reference(obj) - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) + return blosc2.Ref.c2array_ref(obj.path, obj.urlbase).to_dict() if isinstance(obj, blosc2.LazyExpr): expression = obj.expression_tosave if hasattr(obj, "expression_tosave") else obj.expression operands = obj.operands_tosave if hasattr(obj, "operands_tosave") else obj.operands - payload = { + return { "kind": "lazyexpr", - "version": _BLOSC2_STRUCTURED_VERSION, + "version": _B2OBJECT_VERSION, "expression": expression, "operands": {key: _encode_operand_reference(value) for key, value in operands.items()}, } - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) if isinstance(obj, blosc2.LazyUDF): if not isinstance(obj.func, DSLKernel): raise TypeError("Structured Blosc2 msgpack payload only supports LazyUDF backed by DSLKernel") @@ -75,15 +82,11 @@ def _encode_structured_reference(obj): kwargs[key] = asdict(value) else: kwargs[key] = value - # Keep both source forms: - # - udf_source recreates the executable Python function object - # - dsl_source preserves the DSLKernel's normalized DSL metadata so the - # reconstructed function can keep its DSL identity and fast-path hints - payload = { + return { "kind": "lazyudf", - "version": _BLOSC2_STRUCTURED_VERSION, + "version": _B2OBJECT_VERSION, "function_kind": "dsl", - "dsl_version": _BLOSC2_DSL_VERSION, + "dsl_version": _B2OBJECT_DSL_VERSION, "name": udf_name, "udf_source": udf_source, "dsl_source": obj.func.dsl_source, @@ -92,76 +95,60 @@ def _encode_structured_reference(obj): "operands": {f"o{i}": _encode_operand_reference(value) for i, value in enumerate(obj.inputs)}, "kwargs": kwargs, } - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) return None -def _decode_operand_reference(payload): - return Ref.from_dict(payload).open() - - -def _decode_structured_reference(data): - payload = unpackb(data) - if not isinstance(payload, dict): - raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping") - - version = payload.get("version") - if version != _BLOSC2_STRUCTURED_VERSION: - raise ValueError(f"Unsupported structured Blosc2 msgpack payload version: {version!r}") - +def _decode_b2object_payload(payload: dict[str, Any]): kind = payload.get("kind") - if kind == "ref": - ref_payload = payload.get("ref") - return Ref.from_dict(ref_payload) + version = payload.get("version") + if version != _B2OBJECT_VERSION: + raise ValueError(f"Unsupported persisted Blosc2 object version: {version!r}") if kind == "c2array": - return _decode_operand_reference(payload) + ref = blosc2.Ref.from_dict(payload) + return ref.open() if kind == "lazyexpr": return _decode_structured_lazyexpr(payload) if kind == "lazyudf": return _decode_structured_lazyudf(payload) - raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}") + raise ValueError(f"Unsupported persisted Blosc2 object kind: {kind!r}") def _decode_structured_lazyexpr(payload): - import blosc2 - expression = payload.get("expression") if not isinstance(expression, str): - raise TypeError("Structured LazyExpr msgpack payload requires a string 'expression'") + raise TypeError("Structured LazyExpr payload requires a string 'expression'") operands_payload = payload.get("operands") if not isinstance(operands_payload, dict): - raise TypeError("Structured LazyExpr msgpack payload requires a mapping 'operands'") + raise TypeError("Structured LazyExpr payload requires a mapping 'operands'") operands = {key: _decode_operand_reference(value) for key, value in operands_payload.items()} return blosc2.lazyexpr(expression, operands=operands) def _decode_structured_lazyudf(payload): - import blosc2 - function_kind = payload.get("function_kind") if function_kind != "dsl": raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}") dsl_version = payload.get("dsl_version") - if dsl_version != _BLOSC2_DSL_VERSION: + if dsl_version != _B2OBJECT_DSL_VERSION: raise ValueError(f"Unsupported structured LazyUDF DSL version: {dsl_version!r}") udf_source = payload.get("udf_source") if not isinstance(udf_source, str): - raise TypeError("Structured LazyUDF msgpack payload requires a string 'udf_source'") + raise TypeError("Structured LazyUDF payload requires a string 'udf_source'") name = payload.get("name") if not isinstance(name, str): - raise TypeError("Structured LazyUDF msgpack payload requires a string 'name'") + raise TypeError("Structured LazyUDF payload requires a string 'name'") dtype = payload.get("dtype") if not isinstance(dtype, str): - raise TypeError("Structured LazyUDF msgpack payload requires a string 'dtype'") + raise TypeError("Structured LazyUDF payload requires a string 'dtype'") shape_payload = payload.get("shape") if not isinstance(shape_payload, list): - raise TypeError("Structured LazyUDF msgpack payload requires a list 'shape'") + raise TypeError("Structured LazyUDF payload requires a list 'shape'") operands_payload = payload.get("operands") if not isinstance(operands_payload, dict): - raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'operands'") + raise TypeError("Structured LazyUDF payload requires a mapping 'operands'") kwargs = payload.get("kwargs", {}) if not isinstance(kwargs, dict): - raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'kwargs'") + raise TypeError("Structured LazyUDF payload requires a mapping 'kwargs'") local_ns = {} filename = f"<{name}>" @@ -185,38 +172,26 @@ def _decode_structured_lazyudf(payload): return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs) -def _encode_msgpack_ext(obj): - import blosc2 - - if isinstance( - obj, blosc2.NDArray | blosc2.SChunk | blosc2.VLArray | blosc2.BatchStore | blosc2.EmbedStore - ): - return ExtType(_BLOSC2_EXT_CODE, obj.to_cframe()) - structured = _encode_structured_reference(obj) - if structured is not None: - return structured - return blosc2_ext.encode_tuple(obj) - - -def msgpack_packb(value): - return packb(value, default=_encode_msgpack_ext, strict_types=True, use_bin_type=True) - - -def decode_tuple_list_hook(obj): - if obj and isinstance(obj[0], str) and obj[0] == "__tuple__": - return tuple(obj[1:]) - return obj +def _read_b2object_marker(obj) -> dict[str, Any] | None: + schunk = getattr(obj, "schunk", obj) + if _B2OBJECT_META_KEY not in schunk.meta: + return None + return schunk.meta[_B2OBJECT_META_KEY] -def _decode_msgpack_ext(code, data): - import blosc2 +def _read_b2object_payload(obj) -> dict[str, Any]: + schunk = getattr(obj, "schunk", obj) + return schunk.vlmeta[_B2OBJECT_META_KEY] - if code == _BLOSC2_EXT_CODE: - return blosc2.from_cframe(data, copy=True) - if code == _BLOSC2_STRUCTURED_EXT_CODE: - return _decode_structured_reference(data) - return ExtType(code, data) +def _open_b2object(obj): + marker = _read_b2object_marker(obj) + if marker is None: + return None -def msgpack_unpackb(payload): - return unpackb(payload, list_hook=decode_tuple_list_hook, ext_hook=_decode_msgpack_ext) + payload = _read_b2object_payload(obj) + if marker.get("version") != _B2OBJECT_VERSION: + raise ValueError(f"Unsupported persisted Blosc2 object version: {marker.get('version')!r}") + if marker.get("kind") != payload.get("kind"): + raise ValueError("Persisted Blosc2 object marker/payload kind mismatch") + return _decode_b2object_payload(payload) diff --git a/src/blosc2/batch_store.py b/src/blosc2/batch_store.py index 23d7f0e3..4b93d670 100644 --- a/src/blosc2/batch_store.py +++ b/src/blosc2/batch_store.py @@ -18,8 +18,8 @@ import numpy as np import blosc2 -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb from blosc2.info import InfoReporter, format_nbytes_info +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb _BATCHSTORE_META = {"version": 1, "serializer": "msgpack", "items_per_block": None, "arrow_schema": None} _SUPPORTED_SERIALIZERS = {"msgpack", "arrow"} diff --git a/src/blosc2/c2array.py b/src/blosc2/c2array.py index 11f7f6cb..1fb4696b 100644 --- a/src/blosc2/c2array.py +++ b/src/blosc2/c2array.py @@ -18,6 +18,7 @@ import requests import blosc2 +from blosc2.b2objects import _encode_b2object_payload, _make_b2object_carrier, _write_b2object_payload from blosc2.info import InfoReporter, format_nbytes_info _subscriber_data = { @@ -238,6 +239,37 @@ def __init__(self, path: str, /, urlbase: str | None = None, auth_token: str | N cparams.pop("filters, meta", None) self._cparams = blosc2.CParams(**cparams) + def _to_b2object_payload(self) -> dict: + payload = _encode_b2object_payload(self) + if payload is None: + raise TypeError("Unsupported persisted Blosc2 object") + return payload + + def _to_b2object_carrier(self, **kwargs): + array = _make_b2object_carrier( + "c2array", + self.shape, + self.dtype, + chunks=self.chunks, + blocks=self.blocks, + cparams=self.cparams, + **kwargs, + ) + _write_b2object_payload(array, self._to_b2object_payload()) + return array + + def to_cframe(self) -> bytes: + """Serialize the remote array reference as a CFrame-backed Blosc2 object.""" + return self._to_b2object_carrier().to_cframe() + + def save(self, urlpath: str, contiguous: bool = True, **kwargs) -> None: + """Persist the remote array reference using a CFrame-backed carrier.""" + blosc2.blosc2_ext.check_access_mode(urlpath, "w") + kwargs["urlpath"] = urlpath + kwargs["contiguous"] = contiguous + kwargs["mode"] = "w" + self._to_b2object_carrier(**kwargs) + def __getitem__(self, slice_: int | slice | Sequence[slice]) -> np.ndarray: """ Get a slice of the array (returning NumPy array). diff --git a/src/blosc2/core.py b/src/blosc2/core.py index d574a21e..2e95b861 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1918,7 +1918,9 @@ def ndarray_from_cframe(cframe: bytes | str, copy: bool = False) -> blosc2.NDArr def from_cframe( cframe: bytes | str, copy: bool = True -) -> blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchStore | blosc2.VLArray: +) -> ( + blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchStore | blosc2.VLArray | blosc2.C2Array +): """Create a :ref:`EmbedStore `, :ref:`NDArray `, :ref:`SChunk `, :ref:`BatchStore ` or :ref:`VLArray ` instance from a contiguous frame buffer. @@ -1956,6 +1958,8 @@ def from_cframe( return blosc2.BatchStore(_from_schunk=schunk_from_cframe(cframe, copy=copy)) if "vlarray" in schunk.meta: return blosc2.vlarray_from_cframe(cframe, copy=copy) + if "b2o" in schunk.meta: + return blosc2._open_b2object(ndarray_from_cframe(cframe, copy=copy)) if "b2nd" in schunk.meta: return ndarray_from_cframe(cframe, copy=copy) return schunk_from_cframe(cframe, copy=copy) diff --git a/src/blosc2/msgpack_utils.py b/src/blosc2/msgpack_utils.py new file mode 100644 index 00000000..1d3e5112 --- /dev/null +++ b/src/blosc2/msgpack_utils.py @@ -0,0 +1,92 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +from msgpack import ExtType, packb, unpackb + +from blosc2 import blosc2_ext +from blosc2.b2objects import _decode_b2object_payload, _encode_b2object_payload +from blosc2.ref import Ref + +# Msgpack extension type codes are application-defined. Reserve code 42 in +# python-blosc2 for values serialized as Blosc2 CFrames via ``to_cframe()`` and +# reconstructed with ``blosc2.from_cframe()``. Keep this stable for backward +# compatibility with persisted msgpack payloads produced by this package. +_BLOSC2_EXT_CODE = 42 +# Reserve code 43 for structured Blosc2 reference objects that are not naturally +# serialized as CFrames. The payload is a msgpack-encoded mapping with a +# stable ``kind`` and ``version`` envelope. +_BLOSC2_STRUCTURED_EXT_CODE = 43 +_BLOSC2_STRUCTURED_VERSION = 1 + + +def _encode_structured_reference(obj): + import blosc2 + + if isinstance(obj, blosc2.Ref): + payload = {"kind": "ref", "version": _BLOSC2_STRUCTURED_VERSION, "ref": obj.to_dict()} + return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) + payload = _encode_b2object_payload(obj) + if payload is not None: + return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) + return None + + +def _decode_structured_reference(data): + payload = unpackb(data) + if not isinstance(payload, dict): + raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping") + + version = payload.get("version") + if version != _BLOSC2_STRUCTURED_VERSION: + raise ValueError(f"Unsupported structured Blosc2 msgpack payload version: {version!r}") + + kind = payload.get("kind") + if kind == "ref": + ref_payload = payload.get("ref") + return Ref.from_dict(ref_payload) + if kind in {"c2array", "lazyexpr", "lazyudf"}: + return _decode_b2object_payload(payload) + raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}") + + +def _encode_msgpack_ext(obj): + import blosc2 + + if isinstance( + obj, blosc2.NDArray | blosc2.SChunk | blosc2.VLArray | blosc2.BatchStore | blosc2.EmbedStore + ): + return ExtType(_BLOSC2_EXT_CODE, obj.to_cframe()) + structured = _encode_structured_reference(obj) + if structured is not None: + return structured + return blosc2_ext.encode_tuple(obj) + + +def msgpack_packb(value): + return packb(value, default=_encode_msgpack_ext, strict_types=True, use_bin_type=True) + + +def decode_tuple_list_hook(obj): + if obj and isinstance(obj[0], str) and obj[0] == "__tuple__": + return tuple(obj[1:]) + return obj + + +def _decode_msgpack_ext(code, data): + import blosc2 + + if code == _BLOSC2_EXT_CODE: + return blosc2.from_cframe(data, copy=True) + if code == _BLOSC2_STRUCTURED_EXT_CODE: + return _decode_structured_reference(data) + return ExtType(code, data) + + +def msgpack_unpackb(payload): + return unpackb(payload, list_hook=decode_tuple_list_hook, ext_hook=_decode_msgpack_ext) diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index 95c9df72..a9da58cf 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -19,8 +19,8 @@ import blosc2 from blosc2 import SpecialValue, blosc2_ext -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb from blosc2.info import InfoReporter, format_nbytes_info +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb class vlmeta(MutableMapping, blosc2_ext.vlmeta): @@ -1634,6 +1634,9 @@ def _process_opened_object(res): elif not proxy_src["caterva2_env"]: raise RuntimeError("Could not find the source when opening a Proxy") + if "b2o" in meta: + return blosc2._open_b2object(res) + if "vlarray" in meta: from blosc2.vlarray import VLArray diff --git a/src/blosc2/vlarray.py b/src/blosc2/vlarray.py index 569ffce0..4f9f3401 100644 --- a/src/blosc2/vlarray.py +++ b/src/blosc2/vlarray.py @@ -12,8 +12,8 @@ from typing import TYPE_CHECKING, Any import blosc2 -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb from blosc2.info import InfoReporter, format_nbytes_info +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb if TYPE_CHECKING: from collections.abc import Iterator diff --git a/tests/test_b2objects.py b/tests/test_b2objects.py new file mode 100644 index 00000000..70f94070 --- /dev/null +++ b/tests/test_b2objects.py @@ -0,0 +1,83 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +import numpy as np + +import blosc2 +import blosc2.c2array as blosc2_c2array + + +def _make_c2array( + monkeypatch, + path="@public/examples/ds-1d.b2nd", + urlbase="https://cat2.cloud/demo/", + auth_token=None, + shape=(5,), + chunks=(5,), + blocks=(5,), + dtype=np.int64, +): + dtype = np.dtype(dtype) + + def fake_info(path_, urlbase_, params=None, headers=None, model=None, auth_token=None): + return { + "shape": list(shape), + "chunks": list(chunks), + "blocks": list(blocks), + "dtype": np.dtype(dtype).str, + "schunk": { + "cparams": dict(blosc2.cparams_dflts), + "nbytes": int(np.prod(shape)) * np.dtype(dtype).itemsize, + "cbytes": int(np.prod(shape)) * np.dtype(dtype).itemsize, + "cratio": 1.0, + "blocksize": int(np.prod(blocks)) * np.dtype(dtype).itemsize, + "vlmeta": {}, + }, + } + + monkeypatch.setattr(blosc2_c2array, "info", fake_info) + return blosc2.C2Array(path, urlbase=urlbase, auth_token=auth_token) + + +def test_c2array_from_cframe_roundtrip(monkeypatch): + original = _make_c2array(monkeypatch, auth_token="secret-token") + carrier = blosc2.ndarray_from_cframe(original.to_cframe()) + + assert carrier.schunk.meta["b2o"] == {"kind": "c2array", "version": 1} + assert carrier.schunk.vlmeta["b2o"] == { + "kind": "c2array", + "version": 1, + "path": original.path, + "urlbase": original.urlbase, + } + + restored = blosc2.from_cframe(original.to_cframe()) + + assert isinstance(restored, blosc2.C2Array) + assert restored.path == original.path + assert restored.urlbase == original.urlbase + assert restored.auth_token is None + assert restored.shape == original.shape + assert restored.dtype == original.dtype + + +def test_c2array_open_roundtrip(tmp_path, monkeypatch): + original = _make_c2array(monkeypatch, shape=(8,), chunks=(4,), blocks=(2,)) + urlpath = tmp_path / "remote-array.b2o" + + original.save(urlpath) + restored = blosc2.open(urlpath, mode="r") + + assert isinstance(restored, blosc2.C2Array) + assert restored.path == original.path + assert restored.urlbase == original.urlbase + assert restored.auth_token is None + assert restored.shape == original.shape + assert restored.chunks == original.chunks + assert restored.blocks == original.blocks diff --git a/tests/test_batch_store.py b/tests/test_batch_store.py index 7c693732..ea27d809 100644 --- a/tests/test_batch_store.py +++ b/tests/test_batch_store.py @@ -10,7 +10,7 @@ import blosc2 import blosc2.c2array as blosc2_c2array -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb @blosc2.dsl_kernel From 493c8051276c984961d9189c5ca1ebb2ffb21967 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 06:28:29 +0200 Subject: [PATCH 02/13] Add b2o carrier support for persisted LazyExpr objects --- src/blosc2/lazyexpr.py | 64 +++++++++--------------- tests/data/legacy_lazyexpr_v1/a.b2nd | Bin 0 -> 282 bytes tests/data/legacy_lazyexpr_v1/b.b2nd | Bin 0 -> 282 bytes tests/data/legacy_lazyexpr_v1/expr.b2nd | Bin 0 -> 346 bytes tests/test_b2objects.py | 49 +++++++++++++++++- 5 files changed, 72 insertions(+), 41 deletions(-) create mode 100644 tests/data/legacy_lazyexpr_v1/a.b2nd create mode 100644 tests/data/legacy_lazyexpr_v1/b.b2nd create mode 100644 tests/data/legacy_lazyexpr_v1/expr.b2nd diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index c6893686..4755199d 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -42,6 +42,7 @@ import blosc2 +from .b2objects import _encode_b2object_payload, _make_b2object_carrier, _write_b2object_payload from .dsl_kernel import DSLKernel, DSLSyntaxError, _DSLValidator, specialize_miniexpr_inputs if blosc2._HAS_NUMBA: @@ -570,7 +571,7 @@ def convert_inputs(inputs): return [] inputs_ = [] for obj in inputs: - if not isinstance(obj, (np.ndarray, blosc2.Operand)) and not np.isscalar(obj): + if not isinstance(obj, np.ndarray | blosc2.Operand) and not np.isscalar(obj): try: obj = blosc2.SimpleProxy(obj) except Exception: @@ -2855,7 +2856,7 @@ def result_type( # Follow NumPy rules for scalar-array operations # Create small arrays with the same dtypes and let NumPy's type promotion determine the result type arrs = [ - (np.array(value).dtype if isinstance(value, (str, bytes)) else value) + (np.array(value).dtype if isinstance(value, str | bytes) else value) if (np.isscalar(value) or not hasattr(value, "dtype")) else np.array([0], dtype=_convert_dtype(value.dtype)) for value in arrays_and_dtypes @@ -2902,7 +2903,7 @@ def __init__(self, new_op): # noqa: C901 # Check that operands are proper Operands, LazyArray or scalars; if not, convert to NDArray objects value1 = ( blosc2.SimpleProxy(value1) - if not (isinstance(value1, (blosc2.Operand, np.ndarray)) or np.isscalar(value1)) + if not (isinstance(value1, blosc2.Operand | np.ndarray) or np.isscalar(value1)) else value1 ) # Reset values represented as np.int64 etc. to be set as Python natives @@ -2926,7 +2927,7 @@ def __init__(self, new_op): # noqa: C901 return value2 = ( blosc2.SimpleProxy(value2) - if not (isinstance(value2, (blosc2.Operand, np.ndarray)) or np.isscalar(value2)) + if not (isinstance(value2, blosc2.Operand | np.ndarray) or np.isscalar(value2)) else value2 ) # Reset values represented as np.int64 etc. to be set as Python natives @@ -3711,44 +3712,27 @@ def save(self, urlpath=None, **kwargs): if urlpath is None: raise ValueError("To save a LazyArray you must provide an urlpath") - expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression - operands_ = self.operands_tosave if hasattr(self, "operands_tosave") else self.operands - # Validate expression - validate_expr(expression) - - meta = kwargs.get("meta", {}) - meta["LazyArray"] = LazyArrayEnum.Expr.value kwargs["urlpath"] = urlpath - kwargs["meta"] = meta kwargs["mode"] = "w" # always overwrite the file in urlpath + self._to_b2object_carrier(**kwargs) - # Create an empty array; useful for providing the shape and dtype of the outcome - array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) - - # Save the expression and operands in the metadata - operands = {} - for key, value in operands_.items(): - if isinstance(value, blosc2.C2Array): - operands[key] = { - "path": str(value.path), - "urlbase": value.urlbase, - } - continue - if isinstance(value, blosc2.Proxy): - # Take the required info from the Proxy._cache container - value = value._cache - if not hasattr(value, "schunk"): - raise ValueError( - "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" - ) - if value.schunk.urlpath is None: - raise ValueError("To save a LazyArray, all operands must be stored on disk/network") - operands[key] = value.schunk.urlpath - array.schunk.vlmeta["_LazyArray"] = { - "expression": expression, - "UDF": None, - "operands": operands, - } + def to_cframe(self) -> bytes: + return self._to_b2object_carrier().to_cframe() + + def _to_b2object_carrier(self, **kwargs): + payload = _encode_b2object_payload(self) + if payload is None: + raise TypeError("Unsupported persisted Blosc2 object") + array = _make_b2object_carrier( + "lazyexpr", + self.shape, + self.dtype, + chunks=self.chunks, + blocks=self.blocks, + **kwargs, + ) + _write_b2object_payload(array, payload) + return array @classmethod def _new_expr(cls, expression, operands, guess, out=None, where=None, ne_args=None): @@ -3764,7 +3748,7 @@ def _new_expr(cls, expression, operands, guess, out=None, where=None, ne_args=No _operands = operands | local_vars # Check that operands are proper Operands, LazyArray or scalars; if not, convert to NDArray objects for op, val in _operands.items(): - if not (isinstance(val, (blosc2.Operand, np.ndarray)) or np.isscalar(val)): + if not (isinstance(val, blosc2.Operand | np.ndarray) or np.isscalar(val)): _operands[op] = blosc2.SimpleProxy(val) # for scalars just return value (internally converts to () if necessary) opshapes = {k: v if not hasattr(v, "shape") else v.shape for k, v in _operands.items()} diff --git a/tests/data/legacy_lazyexpr_v1/a.b2nd b/tests/data/legacy_lazyexpr_v1/a.b2nd new file mode 100644 index 0000000000000000000000000000000000000000..a1eaef4787ebd17d05a12f2a1531c436d3bc8f08 GIT binary patch literal 282 zcmbQYBFQMNC^0vc;SvJ_!=&>-0tgtTmIyI~GF^u77&KrETaY3S5TOAkE;4Wc(V-h` zV1a0h9z`kO5%8 d2xB1VDU3iDvfX1~xWmA34k)aA10e@x0RYOXC%ymx literal 0 HcmV?d00001 diff --git a/tests/data/legacy_lazyexpr_v1/b.b2nd b/tests/data/legacy_lazyexpr_v1/b.b2nd new file mode 100644 index 0000000000000000000000000000000000000000..c19f3013a5aaadad232d3eb1cfecce66be7c4345 GIT binary patch literal 282 zcmbQYBFQMNC^0vc;SvJ_!=&>-0tgtTmIyI~GF^u77&KrETaY3S5TOAkE;4Wc(V-h` zV1a0h9z`kO5%8 d2xB1VDU3iDvfX1~xWmA34k)aA10e@x0RYYDC&&N* literal 0 HcmV?d00001 diff --git a/tests/data/legacy_lazyexpr_v1/expr.b2nd b/tests/data/legacy_lazyexpr_v1/expr.b2nd new file mode 100644 index 0000000000000000000000000000000000000000..1a94137670c89168d8080f2cad754cd026242002 GIT binary patch literal 346 zcmbQYBFQMNC^0vc;SvJ_!}9Y$0tgtRmIyI~GF^u77&KrEup$nSN)0e^k%0q<4&Pt{ zD`aGas%K!Be3n7s9s|>oB%{0(kjko+K8aP8jzvX@l_2iII}A+6fV9$d2F8gn!&xVS zq@X0jZ6KT3Ces3>jFEwrk(Gl3$O2&vAVvcQP=4bSMxf1N_ZS#g#iLj#bBBQuWKu9I zV>_cUkO9KMP+bg24sTwST2WAxT3no&pSLtoL0chdaj1*iffe}$sYQu-DaB0-6PG3G WfxNXaX;~7OaSmuq_YIg?a2f!?Qb<_< literal 0 HcmV?d00001 diff --git a/tests/test_b2objects.py b/tests/test_b2objects.py index 70f94070..11870fa5 100644 --- a/tests/test_b2objects.py +++ b/tests/test_b2objects.py @@ -7,6 +7,8 @@ from __future__ import annotations +from pathlib import Path + import numpy as np import blosc2 @@ -69,7 +71,7 @@ def test_c2array_from_cframe_roundtrip(monkeypatch): def test_c2array_open_roundtrip(tmp_path, monkeypatch): original = _make_c2array(monkeypatch, shape=(8,), chunks=(4,), blocks=(2,)) - urlpath = tmp_path / "remote-array.b2o" + urlpath = tmp_path / "remote-array.b2nd" original.save(urlpath) restored = blosc2.open(urlpath, mode="r") @@ -81,3 +83,48 @@ def test_c2array_open_roundtrip(tmp_path, monkeypatch): assert restored.shape == original.shape assert restored.chunks == original.chunks assert restored.blocks == original.blocks + + +def test_lazyexpr_from_cframe_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b}) + carrier = blosc2.ndarray_from_cframe(expr.to_cframe()) + + assert carrier.schunk.meta["b2o"] == {"kind": "lazyexpr", "version": 1} + assert carrier.schunk.vlmeta["b2o"] == { + "kind": "lazyexpr", + "version": 1, + "expression": "a + b", + "operands": { + "a": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "a.b2nd")}, + "b": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "b.b2nd")}, + }, + } + + restored = blosc2.from_cframe(expr.to_cframe()) + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3) + + +def test_lazyexpr_open_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b}) + urlpath = tmp_path / "expr.b2nd" + + expr.save(urlpath) + restored = blosc2.open(urlpath, mode="r") + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3) + + +def test_legacy_lazyexpr_open_backward_compat(): + fixture = Path(__file__).parent / "data" / "legacy_lazyexpr_v1" / "expr.b2nd" + + restored = blosc2.open(fixture, mode="r") + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3) From 1eb3dd756d795226e372e55d5b6ccbd8d749970f Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 06:51:40 +0200 Subject: [PATCH 03/13] Normalize internal helper and module names across blosc2 --- src/blosc2/__init__.py | 6 +- src/blosc2/b2objects.py | 40 ++++++------ src/blosc2/blosc2_ext.pyx | 2 +- src/blosc2/c2array.py | 8 +-- src/blosc2/core.py | 2 +- src/blosc2/dict_store.py | 10 +-- src/blosc2/dsl_kernel.py | 12 ++-- src/blosc2/lazyexpr.py | 77 ++++++++++++------------ src/blosc2/msgpack_utils.py | 6 +- src/blosc2/ndarray.py | 48 +++++++-------- src/blosc2/proxy.py | 6 +- src/blosc2/schunk.py | 8 +-- src/blosc2/utils.py | 16 ++--- src/blosc2/{_wasm_jit.py => wasm_jit.py} | 6 +- 14 files changed, 124 insertions(+), 123 deletions(-) rename src/blosc2/{_wasm_jit.py => wasm_jit.py} (98%) diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index f8044258..8ba00c5e 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -253,7 +253,7 @@ class FPAccuracy(Enum): The C-Blosc2 version's string.""" if IS_WASM: - from ._wasm_jit import init_wasm_jit_helpers + from .wasm_jit import init_wasm_jit_helpers _WASM_MINIEXPR_ENABLED = init_wasm_jit_helpers() @@ -538,7 +538,7 @@ def _raise(exc): from .batch_store import Batch, BatchStore from .vlarray import VLArray, vlarray_from_cframe from .ref import Ref -from .b2objects import _open_b2object +from .b2objects import open_b2object from .c2array import c2context, C2Array, URLPath @@ -549,7 +549,7 @@ def _raise(exc): lazyexpr, LazyArray, LazyUDF, - _open_lazyarray, + open_lazyarray, get_expr_operands, validate_expr, evaluate, diff --git a/src/blosc2/b2objects.py b/src/blosc2/b2objects.py index 72e29ca2..5f02119c 100644 --- a/src/blosc2/b2objects.py +++ b/src/blosc2/b2objects.py @@ -24,7 +24,7 @@ _B2OBJECT_DSL_VERSION = 1 -def _make_b2object_carrier( +def make_b2object_carrier( kind: str, shape, dtype, @@ -39,19 +39,19 @@ def _make_b2object_carrier( return blosc2.empty(shape=shape, dtype=dtype, chunks=chunks, blocks=blocks, **kwargs) -def _write_b2object_payload(array, payload: dict[str, Any]) -> None: +def write_b2object_payload(array, payload: dict[str, Any]) -> None: array.schunk.vlmeta[_B2OBJECT_META_KEY] = payload -def _encode_operand_reference(obj): +def encode_operand_reference(obj): return blosc2.Ref.from_object(obj).to_dict() -def _decode_operand_reference(payload): +def decode_operand_reference(payload): return blosc2.Ref.from_dict(payload).open() -def _encode_b2object_payload(obj) -> dict[str, Any] | None: +def encode_b2object_payload(obj) -> dict[str, Any] | None: if isinstance(obj, blosc2.C2Array): return blosc2.Ref.c2array_ref(obj.path, obj.urlbase).to_dict() if isinstance(obj, blosc2.LazyExpr): @@ -61,7 +61,7 @@ def _encode_b2object_payload(obj) -> dict[str, Any] | None: "kind": "lazyexpr", "version": _B2OBJECT_VERSION, "expression": expression, - "operands": {key: _encode_operand_reference(value) for key, value in operands.items()}, + "operands": {key: encode_operand_reference(value) for key, value in operands.items()}, } if isinstance(obj, blosc2.LazyUDF): if not isinstance(obj.func, DSLKernel): @@ -92,13 +92,13 @@ def _encode_b2object_payload(obj) -> dict[str, Any] | None: "dsl_source": obj.func.dsl_source, "dtype": np.dtype(obj.dtype).str, "shape": list(obj.shape), - "operands": {f"o{i}": _encode_operand_reference(value) for i, value in enumerate(obj.inputs)}, + "operands": {f"o{i}": encode_operand_reference(value) for i, value in enumerate(obj.inputs)}, "kwargs": kwargs, } return None -def _decode_b2object_payload(payload: dict[str, Any]): +def decode_b2object_payload(payload: dict[str, Any]): kind = payload.get("kind") version = payload.get("version") if version != _B2OBJECT_VERSION: @@ -107,24 +107,24 @@ def _decode_b2object_payload(payload: dict[str, Any]): ref = blosc2.Ref.from_dict(payload) return ref.open() if kind == "lazyexpr": - return _decode_structured_lazyexpr(payload) + return decode_structured_lazyexpr(payload) if kind == "lazyudf": - return _decode_structured_lazyudf(payload) + return decode_structured_lazyudf(payload) raise ValueError(f"Unsupported persisted Blosc2 object kind: {kind!r}") -def _decode_structured_lazyexpr(payload): +def decode_structured_lazyexpr(payload): expression = payload.get("expression") if not isinstance(expression, str): raise TypeError("Structured LazyExpr payload requires a string 'expression'") operands_payload = payload.get("operands") if not isinstance(operands_payload, dict): raise TypeError("Structured LazyExpr payload requires a mapping 'operands'") - operands = {key: _decode_operand_reference(value) for key, value in operands_payload.items()} + operands = {key: decode_operand_reference(value) for key, value in operands_payload.items()} return blosc2.lazyexpr(expression, operands=operands) -def _decode_structured_lazyudf(payload): +def decode_structured_lazyudf(payload): function_kind = payload.get("function_kind") if function_kind != "dsl": raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}") @@ -167,31 +167,31 @@ def _decode_structured_lazyudf(payload): func.dsl_source = dsl_source operands = tuple( - _decode_operand_reference(operands_payload[f"o{n}"]) for n in range(len(operands_payload)) + decode_operand_reference(operands_payload[f"o{n}"]) for n in range(len(operands_payload)) ) return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs) -def _read_b2object_marker(obj) -> dict[str, Any] | None: +def read_b2object_marker(obj) -> dict[str, Any] | None: schunk = getattr(obj, "schunk", obj) if _B2OBJECT_META_KEY not in schunk.meta: return None return schunk.meta[_B2OBJECT_META_KEY] -def _read_b2object_payload(obj) -> dict[str, Any]: +def read_b2object_payload(obj) -> dict[str, Any]: schunk = getattr(obj, "schunk", obj) return schunk.vlmeta[_B2OBJECT_META_KEY] -def _open_b2object(obj): - marker = _read_b2object_marker(obj) +def open_b2object(obj): + marker = read_b2object_marker(obj) if marker is None: return None - payload = _read_b2object_payload(obj) + payload = read_b2object_payload(obj) if marker.get("version") != _B2OBJECT_VERSION: raise ValueError(f"Unsupported persisted Blosc2 object version: {marker.get('version')!r}") if marker.get("kind") != payload.get("kind"): raise ValueError("Persisted Blosc2 object marker/payload kind mismatch") - return _decode_b2object_payload(payload) + return decode_b2object_payload(payload) diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index 8a442eb7..2ba002e5 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -726,7 +726,7 @@ def destroy(): blosc2_destroy() -def _register_wasm_jit_helpers(uintptr_t instantiate_ptr, uintptr_t free_ptr): +def register_wasm_jit_helpers(uintptr_t instantiate_ptr, uintptr_t free_ptr): cdef me_wasm_jit_instantiate_helper instantiate_helper = ( instantiate_ptr ) diff --git a/src/blosc2/c2array.py b/src/blosc2/c2array.py index 1fb4696b..af0229c4 100644 --- a/src/blosc2/c2array.py +++ b/src/blosc2/c2array.py @@ -18,7 +18,7 @@ import requests import blosc2 -from blosc2.b2objects import _encode_b2object_payload, _make_b2object_carrier, _write_b2object_payload +from blosc2.b2objects import encode_b2object_payload, make_b2object_carrier, write_b2object_payload from blosc2.info import InfoReporter, format_nbytes_info _subscriber_data = { @@ -240,13 +240,13 @@ def __init__(self, path: str, /, urlbase: str | None = None, auth_token: str | N self._cparams = blosc2.CParams(**cparams) def _to_b2object_payload(self) -> dict: - payload = _encode_b2object_payload(self) + payload = encode_b2object_payload(self) if payload is None: raise TypeError("Unsupported persisted Blosc2 object") return payload def _to_b2object_carrier(self, **kwargs): - array = _make_b2object_carrier( + array = make_b2object_carrier( "c2array", self.shape, self.dtype, @@ -255,7 +255,7 @@ def _to_b2object_carrier(self, **kwargs): cparams=self.cparams, **kwargs, ) - _write_b2object_payload(array, self._to_b2object_payload()) + write_b2object_payload(array, self._to_b2object_payload()) return array def to_cframe(self) -> bytes: diff --git a/src/blosc2/core.py b/src/blosc2/core.py index 2e95b861..e3d9d4ed 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1959,7 +1959,7 @@ def from_cframe( if "vlarray" in schunk.meta: return blosc2.vlarray_from_cframe(cframe, copy=copy) if "b2o" in schunk.meta: - return blosc2._open_b2object(ndarray_from_cframe(cframe, copy=copy)) + return blosc2.open_b2object(ndarray_from_cframe(cframe, copy=copy)) if "b2nd" in schunk.meta: return ndarray_from_cframe(cframe, copy=copy) return schunk_from_cframe(cframe, copy=copy) diff --git a/src/blosc2/dict_store.py b/src/blosc2/dict_store.py index 6c112504..d88ce4a4 100644 --- a/src/blosc2/dict_store.py +++ b/src/blosc2/dict_store.py @@ -19,7 +19,7 @@ import blosc2 from blosc2.c2array import C2Array from blosc2.embed_store import EmbedStore -from blosc2.schunk import SChunk, _process_opened_object +from blosc2.schunk import SChunk, process_opened_object if TYPE_CHECKING: from collections.abc import Iterator, Set @@ -227,7 +227,7 @@ def _opened_external_kind( rel_path: str, ) -> str | None: """Return the supported external leaf kind for an already opened object.""" - processed = _process_opened_object(opened) + processed = process_opened_object(opened) if isinstance(processed, blosc2.BatchStore): kind = "batchstore" elif isinstance(processed, blosc2.VLArray): @@ -425,7 +425,7 @@ def __getitem__( mmap_mode=self.mmap_mode, dparams=self.dparams, ) - return self._annotate_external_value(key, _process_opened_object(opened)) + return self._annotate_external_value(key, process_opened_object(opened)) else: urlpath = os.path.join(self.working_dir, filepath) if os.path.exists(urlpath): @@ -501,7 +501,7 @@ def values(self) -> Iterator[blosc2.NDArray | SChunk | C2Array]: offset = self.offsets[filepath]["offset"] yield self._annotate_external_value( key, - _process_opened_object( + process_opened_object( blosc2.blosc2_ext.open( self.b2z_path, mode="r", @@ -541,7 +541,7 @@ def items(self) -> Iterator[tuple[str, blosc2.NDArray | SChunk | C2Array]]: key, self._annotate_external_value( key, - _process_opened_object( + process_opened_object( blosc2.blosc2_ext.open( self.b2z_path, mode="r", diff --git a/src/blosc2/dsl_kernel.py b/src/blosc2/dsl_kernel.py index 59c64186..429d833c 100644 --- a/src/blosc2/dsl_kernel.py +++ b/src/blosc2/dsl_kernel.py @@ -257,7 +257,7 @@ def specialize_dsl_miniexpr_inputs(expr_string: str, operands: dict): return specialize_miniexpr_inputs(expr_string, operands) -class _DSLValidator: +class DSLValidator: _binop_map: ClassVar[dict[type[ast.operator], str]] = { ast.Add: "+", ast.Sub: "-", @@ -526,7 +526,7 @@ def _extract_dsl(self, func, validate: bool = True): if dsl_func is None: raise ValueError("No function definition found in sliced DSL source") if validate: - _DSLValidator(dsl_source).validate(dsl_func) + DSLValidator(dsl_source).validate(dsl_func) input_names = self._input_names_from_signature(dsl_func) if _PRINT_DSL_KERNEL: func_name = getattr(func, "__name__", "") @@ -613,7 +613,7 @@ def validate_dsl(func): } -class _DSLBuilder: +class DSLBuilder: _binop_map: ClassVar[dict[type[ast.operator], str]] = { ast.Add: "+", ast.Sub: "-", @@ -852,9 +852,9 @@ def _cmpop(self, op: ast.cmpop) -> str: raise ValueError("Unsupported comparison in DSL") -class _DSLReducer: - _binop_map: ClassVar[dict[type[ast.operator], str]] = _DSLBuilder._binop_map - _cmp_map: ClassVar[dict[type[ast.cmpop], str]] = _DSLBuilder._cmp_map +class DSLReducer: + _binop_map: ClassVar[dict[type[ast.operator], str]] = DSLBuilder._binop_map + _cmp_map: ClassVar[dict[type[ast.cmpop], str]] = DSLBuilder._cmp_map def __init__(self, max_unroll: int = 64): self._env: dict[str, str] = {} diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 4755199d..773f348f 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -42,23 +42,23 @@ import blosc2 -from .b2objects import _encode_b2object_payload, _make_b2object_carrier, _write_b2object_payload -from .dsl_kernel import DSLKernel, DSLSyntaxError, _DSLValidator, specialize_miniexpr_inputs +from .b2objects import encode_b2object_payload, make_b2object_carrier, write_b2object_payload +from .dsl_kernel import DSLKernel, DSLSyntaxError, DSLValidator, specialize_miniexpr_inputs if blosc2._HAS_NUMBA: import numba + from blosc2 import compute_chunks_blocks from blosc2.info import InfoReporter -from .proxy import _convert_dtype +from .proxy import convert_dtype from .utils import ( - _format_expr_scalar, - _get_chunk_operands, - _sliced_chunk_iter, check_smaller_shape, compute_smaller_slice, constructors, elementwise_funcs, + format_expr_scalar, + get_chunk_operands, get_chunks_idx, get_intersecting_chunks, infer_shape, @@ -69,6 +69,7 @@ process_key, reducers, safe_numpy_globals, + sliced_chunk_iter, try_miniexpr, ) @@ -276,7 +277,7 @@ def get_expr_globals(expression): if not hasattr(enum, "member"): # copy-pasted from Lib/enum.py - class _mymember: + class MyMember: """ Forces item to become an Enum member during class creation. """ @@ -284,7 +285,7 @@ class _mymember: def __init__(self, value): self.value = value else: - _mymember = enum.member # only available after python 3.11 + MyMember = enum.member # only available after python 3.11 class ReduceOp(Enum): @@ -294,25 +295,25 @@ class ReduceOp(Enum): # wrap as enum.member so that Python doesn't treat some funcs # as class methods (rather than Enum members) - SUM = _mymember(np.add) - PROD = _mymember(np.multiply) - MEAN = _mymember(np.mean) - STD = _mymember(np.std) - VAR = _mymember(np.var) + SUM = MyMember(np.add) + PROD = MyMember(np.multiply) + MEAN = MyMember(np.mean) + STD = MyMember(np.std) + VAR = MyMember(np.var) # Computing a median from partial results is not straightforward because the median # is a positional statistic, which means it depends on the relative ordering of all # the data points. Unlike statistics such as the sum or mean, you can't compute a median # from partial results without knowing the entire dataset, and this is way too expensive # for arrays that cannot typically fit in-memory (e.g. disk-based NDArray). # MEDIAN = np.median - MAX = _mymember(np.maximum) - MIN = _mymember(np.minimum) - ANY = _mymember(np.any) - ALL = _mymember(np.all) - ARGMAX = _mymember(np.argmax) - ARGMIN = _mymember(np.argmin) - CUMULATIVE_SUM = _mymember(npcumsum) - CUMULATIVE_PROD = _mymember(npcumprod) + MAX = MyMember(np.maximum) + MIN = MyMember(np.minimum) + ANY = MyMember(np.any) + ALL = MyMember(np.all) + ARGMAX = MyMember(np.argmax) + ARGMIN = MyMember(np.argmin) + CUMULATIVE_SUM = MyMember(npcumsum) + CUMULATIVE_PROD = MyMember(npcumprod) class LazyArrayEnum(Enum): @@ -1117,7 +1118,7 @@ async def async_read_chunks(arrs, info, queue): my_chunk_iter = range(arrs[0].schunk.nchunks) if len(info) == 5: if info[-1] is not None: - my_chunk_iter = _sliced_chunk_iter(chunks_, (), shape, axis=info[-1], nchunk=True) + my_chunk_iter = sliced_chunk_iter(chunks_, (), shape, axis=info[-1], nchunk=True) info = info[:4] for i, nchunk in enumerate(my_chunk_iter): futures = [ @@ -1291,7 +1292,7 @@ def _format_dsl_parse_error_hint(expr_text: str, backend_msg: str): line_no = expr_text.count("\n", 0, err_pos) + 1 line_start = expr_text.rfind("\n", 0, err_pos) + 1 col_no = err_pos - line_start + 1 - dump = _DSLValidator(expr_text)._format_source_with_pointer(line_no, col_no) + dump = DSLValidator(expr_text)._format_source_with_pointer(line_no, col_no) return f"Parse error location (line {line_no}, col {col_no}, offset {err_pos}):\n{dump}" @@ -1788,7 +1789,7 @@ def slices_eval( # noqa: C901 ndindex.ndindex(cslice).as_subindex(_slice).raw ) # in the case _slice=(), just gives cslice - _get_chunk_operands(operands, cslice, chunk_operands, shape) + get_chunk_operands(operands, cslice, chunk_operands, shape) if out is None: shape_ = shape_slice if shape_slice is not None else shape @@ -2337,7 +2338,7 @@ def reduce_slices( # noqa: C901 axis=reduce_args["axis"] if np.isscalar(reduce_args["axis"]) else None, ) else: - _get_chunk_operands(operands, cslice, chunk_operands, shape) + get_chunk_operands(operands, cslice, chunk_operands, shape) if reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: reduced_slice = ( @@ -2858,7 +2859,7 @@ def result_type( arrs = [ (np.array(value).dtype if isinstance(value, str | bytes) else value) if (np.isscalar(value) or not hasattr(value, "dtype")) - else np.array([0], dtype=_convert_dtype(value.dtype)) + else np.array([0], dtype=convert_dtype(value.dtype)) for value in arrays_and_dtypes ] return np.result_type(*arrs) @@ -2945,15 +2946,15 @@ def __init__(self, new_op): # noqa: C901 elif op in funcs_2args: if np.isscalar(value1) and np.isscalar(value2): self.expression = "o0" - svalue1 = _format_expr_scalar(value1) - svalue2 = _format_expr_scalar(value2) + svalue1 = format_expr_scalar(value1) + svalue2 = format_expr_scalar(value2) self.operands = {"o0": ne_evaluate(f"{op}({svalue1}, {svalue2})")} # eager evaluation elif np.isscalar(value2): self.operands = {"o0": value1} - self.expression = f"{op}(o0, {_format_expr_scalar(value2)})" + self.expression = f"{op}(o0, {format_expr_scalar(value2)})" elif np.isscalar(value1): self.operands = {"o0": value2} - self.expression = f"{op}({_format_expr_scalar(value1)}, o0)" + self.expression = f"{op}({format_expr_scalar(value1)}, o0)" else: self.operands = {"o0": value1, "o1": value2} self.expression = f"{op}(o0, o1)" @@ -3027,9 +3028,9 @@ def update_expr(self, new_op): # noqa: C901 def_operands = value1.operands elif isinstance(value1, LazyExpr): if np.isscalar(value2): - v2 = _format_expr_scalar(value2) + v2 = format_expr_scalar(value2) elif hasattr(value2, "shape") and value2.shape == (): - v2 = _format_expr_scalar(value2[()]) + v2 = format_expr_scalar(value2[()]) else: operand_to_key = {id(v): k for k, v in value1.operands.items()} try: @@ -3048,9 +3049,9 @@ def update_expr(self, new_op): # noqa: C901 def_operands = value1.operands else: if np.isscalar(value1): - v1 = _format_expr_scalar(value1) + v1 = format_expr_scalar(value1) elif hasattr(value1, "shape") and value1.shape == (): - v1 = _format_expr_scalar(value1[()]) + v1 = format_expr_scalar(value1[()]) else: operand_to_key = {id(v): k for k, v in value2.operands.items()} try: @@ -3720,10 +3721,10 @@ def to_cframe(self) -> bytes: return self._to_b2object_carrier().to_cframe() def _to_b2object_carrier(self, **kwargs): - payload = _encode_b2object_payload(self) + payload = encode_b2object_payload(self) if payload is None: raise TypeError("Unsupported persisted Blosc2 object") - array = _make_b2object_carrier( + array = make_b2object_carrier( "lazyexpr", self.shape, self.dtype, @@ -3731,7 +3732,7 @@ def _to_b2object_carrier(self, **kwargs): blocks=self.blocks, **kwargs, ) - _write_b2object_payload(array, payload) + write_b2object_payload(array, payload) return array @classmethod @@ -4397,7 +4398,7 @@ def _reconstruct_lazyudf(expr, lazyarray, operands_dict, array): ) -def _open_lazyarray(array): +def open_lazyarray(array): value = array.schunk.meta["LazyArray"] lazyarray = array.schunk.vlmeta["_LazyArray"] if value == LazyArrayEnum.Expr.value: diff --git a/src/blosc2/msgpack_utils.py b/src/blosc2/msgpack_utils.py index 1d3e5112..0dba1d9d 100644 --- a/src/blosc2/msgpack_utils.py +++ b/src/blosc2/msgpack_utils.py @@ -10,7 +10,7 @@ from msgpack import ExtType, packb, unpackb from blosc2 import blosc2_ext -from blosc2.b2objects import _decode_b2object_payload, _encode_b2object_payload +from blosc2.b2objects import decode_b2object_payload, encode_b2object_payload from blosc2.ref import Ref # Msgpack extension type codes are application-defined. Reserve code 42 in @@ -31,7 +31,7 @@ def _encode_structured_reference(obj): if isinstance(obj, blosc2.Ref): payload = {"kind": "ref", "version": _BLOSC2_STRUCTURED_VERSION, "ref": obj.to_dict()} return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) - payload = _encode_b2object_payload(obj) + payload = encode_b2object_payload(obj) if payload is not None: return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) return None @@ -51,7 +51,7 @@ def _decode_structured_reference(data): ref_payload = payload.get("ref") return Ref.from_dict(ref_payload) if kind in {"c2array", "lazyexpr", "lazyudf"}: - return _decode_b2object_payload(payload) + return decode_b2object_payload(payload) raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}") diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 4c35cef6..c972e03a 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -34,10 +34,10 @@ from .linalg import matmul from .utils import ( - _get_local_slice, - _get_selection, - _incomplete_lazyfunc, get_chunks_idx, + get_local_slice, + get_selection, + incomplete_lazyfunc, npbinvert, nplshift, nprshift, @@ -157,7 +157,7 @@ def is_inside_new_expr() -> bool: """ # Get the current call stack stack = inspect.stack() - return builtins.any(frame_info.function in {"_new_expr", "_open_lazyarray"} for frame_info in stack) + return builtins.any(frame_info.function in {"_new_expr", "open_lazyarray"} for frame_info in stack) def make_key_hashable(key): @@ -1834,7 +1834,7 @@ def imag(ndarr: blosc2.Array, /) -> blosc2.LazyExpr: return blosc2.LazyExpr(new_op=(ndarr, "imag", None)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def contains(ndarr: blosc2.Array, value: str | bytes | blosc2.Array, /) -> blosc2.LazyExpr: """ Check if the array contains a specified value. @@ -3060,7 +3060,7 @@ def remainder( return blosc2.LazyExpr(new_op=(x1, "%", x2)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def clip( x: blosc2.Array, min: int | float | blosc2.Array | None = None, @@ -3102,7 +3102,7 @@ def chunkwise_clip(inputs, output, offset): return blosc2.lazyudf(chunkwise_clip, (x, min, max), dtype=dtype, shape=shape, **kwargs) -@_incomplete_lazyfunc +@incomplete_lazyfunc def logaddexp(x1: int | float | blosc2.Array, x2: int | float | blosc2.Array, **kwargs: Any) -> NDArray: """ Calculates the logarithm of the sum of exponentiations log(exp(x1) + exp(x2)) for @@ -4036,7 +4036,7 @@ def get_fselection_numpy(self, key: list | np.ndarray) -> np.ndarray: # now all indices are slices or arrays of integers (or booleans) # # moreover, all arrays are consecutive (otherwise an error is raised) - if np.all([isinstance(s, (slice, np.ndarray)) for s in _slice]) and np.all( + if np.all([isinstance(s, slice | np.ndarray) for s in _slice]) and np.all( [s.dtype is not bool for s in _slice if isinstance(s, np.ndarray)] ): chunks = np.array(chunks) @@ -4123,14 +4123,14 @@ def get_fselection_numpy(self, key: list | np.ndarray) -> np.ndarray: # loop over chunks coming from slices before and after array indices for cprior_tuple in product(*cprior_slices): - out_prior_selection, prior_selection, loc_prior_selection = _get_selection( + out_prior_selection, prior_selection, loc_prior_selection = get_selection( cprior_tuple, prior_tuple, chunks[:begin] ) for cpost_tuple in product(*cpost_slices): - out_post_selection, post_selection, loc_post_selection = _get_selection( + out_post_selection, post_selection, loc_post_selection = get_selection( cpost_tuple, post_tuple, chunks[end:] ) - locbegin, locend = _get_local_slice( + locbegin, locend = get_local_slice( prior_selection, post_selection, (chunk_begin, chunk_end) ) to_be_loaded = np.empty(locend - locbegin, dtype=self.dtype) @@ -4202,10 +4202,10 @@ def _get_set_nonunit_steps(self, _slice, out=None, value=None): slice_to_chunktuple(s, c) for s, c in zip(_slice, chunks, strict=True) ] # internally handles negative steps for c in product(*intersecting_chunks): - sel_idx, glob_selection, sub_idx = _get_selection(c, _slice, chunks) + sel_idx, glob_selection, sub_idx = get_selection(c, _slice, chunks) sel_idx = tuple(s for s, m in zip(sel_idx, mask, strict=True) if not m) sub_idx = tuple(s if not m else s.start for s, m in zip(sub_idx, mask, strict=True)) - locstart, locstop = _get_local_slice( + locstart, locstop = get_local_slice( glob_selection, (), ((), ()), # switches start and stop for negative steps @@ -4298,8 +4298,8 @@ def __getitem__( key = key[()] if hasattr(key, "shape") and key.shape == () else key # convert to scalar # fancy indexing - if isinstance(key_, (list, np.ndarray)) or builtins.any( - isinstance(k, (list, np.ndarray)) for k in key_ + if isinstance(key_, list | np.ndarray) or builtins.any( + isinstance(k, list | np.ndarray) for k in key_ ): # check scalar booleans, which add 1 dim to beginning if np.issubdtype(type(key), bool) and np.isscalar(key): @@ -4397,7 +4397,7 @@ def __setitem__( value if np.isscalar(value) else blosc2.as_simpleproxy(value) ) # convert to SimpleProxy for e.g. JAX, Tensorflow, PyTorch - if builtins.any(isinstance(k, (list, np.ndarray)) for k in key_): # fancy indexing + if builtins.any(isinstance(k, list | np.ndarray) for k in key_): # fancy indexing _slice = ndindex.ndindex(key_).expand( self.shape ) # handles negative indices -> positive internally @@ -4969,7 +4969,7 @@ def where( return condition.where(x, y) -@_incomplete_lazyfunc +@incomplete_lazyfunc def startswith( a: str | blosc2.Array, prefix: str | blosc2.Array ) -> NDArray: # start: int = 0, end: int | None = None, **kwargs) @@ -5009,7 +5009,7 @@ def startswith( return blosc2.LazyExpr(new_op=(a, "startswith", prefix)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def endswith( a: str | blosc2.Array, suffix: str | blosc2.Array ) -> NDArray: # start: int = 0, end: int | None = None, **kwargs) -> NDArray: @@ -5048,7 +5048,7 @@ def endswith( return blosc2.LazyExpr(new_op=(a, "endswith", suffix)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def lower(a: str | blosc2.Array) -> NDArray: """ Copy-pasted from numpy documentation: https://numpy.org/doc/stable/reference/generated/numpy.char.lower.html @@ -5072,7 +5072,7 @@ def lower(a: str | blosc2.Array) -> NDArray: return blosc2.LazyExpr(new_op=(a, "lower", None)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def upper(a: str | blosc2.Array) -> NDArray: """ Copy-pasted from numpy documentation: https://numpy.org/doc/stable/reference/generated/numpy.char.upper.html @@ -6003,7 +6003,7 @@ def asarray(array: Sequence | blosc2.Array, copy: bool | None = None, **kwargs: raise ValueError("Only unsafe casting is supported at the moment.") if not hasattr(array, "shape"): array = np.asarray(array) # defaults if dtype=None - dtype_ = blosc2.proxy._convert_dtype(array.dtype) + dtype_ = blosc2.proxy.convert_dtype(array.dtype) dtype = kwargs.pop("dtype", dtype_) # check if dtype provided kwargs = _check_ndarray_kwargs(**kwargs) chunks = kwargs.pop("chunks", None) @@ -6013,7 +6013,7 @@ def asarray(array: Sequence | blosc2.Array, copy: bool | None = None, **kwargs: chunks = array.chunks # Zarr adds a .blocks property that maps to a zarr.indexing.BlockIndex object # Let's avoid this - if blocks is None and hasattr(array, "blocks") and isinstance(array.blocks, (tuple, list)): + if blocks is None and hasattr(array, "blocks") and isinstance(array.blocks, tuple | list): blocks = array.blocks copy = True if copy is None and not isinstance(array, NDArray) else copy @@ -6572,7 +6572,7 @@ def take(x: blosc2.Array, indices: blosc2.Array, axis: int | None = None) -> NDA raise ValueError("Must specify axis parameter if x is not 1D.") if axis < 0: axis += x.ndim - if not isinstance(axis, (int, np.integer)): + if not isinstance(axis, int | np.integer): raise ValueError("Axis must be integer.") if isinstance(indices, list): indices = np.asarray(indices) @@ -6604,7 +6604,7 @@ def take_along_axis(x: blosc2.Array, indices: blosc2.Array, axis: int = -1) -> N out: NDArray Selected indices of x. """ - if not isinstance(axis, (int, np.integer)): + if not isinstance(axis, int | np.integer): raise ValueError("Axis must be integer.") if indices.ndim != x.ndim: raise ValueError("Indices must have same dimensions as x.") diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index 923cd9f6..ad64a4d4 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -575,7 +575,7 @@ def __getitem__(self, item: slice | list[slice]) -> np.ndarray: return nparr[self.field] -def _convert_dtype(dt: str | DTypeLike): +def convert_dtype(dt: str | DTypeLike): """ Attempts to convert to blosc2.dtype (i.e. numpy dtype) """ @@ -617,14 +617,14 @@ def __init__(self, src, chunks: tuple | None = None, blocks: tuple | None = None if not hasattr(src, "__getitem__"): raise TypeError("The source must have a __getitem__ method") self._src = src - self._dtype = _convert_dtype(src.dtype) + self._dtype = convert_dtype(src.dtype) self._shape = src.shape if isinstance(src.shape, tuple) else tuple(src.shape) # Compute reasonable values for chunks and blocks cparams = blosc2.CParams(clevel=0) def is_ints_sequence(src, attr): seq = getattr(src, attr, None) - if not isinstance(seq, Sequence) or isinstance(seq, (str, bytes)): + if not isinstance(seq, Sequence) or isinstance(seq, str | bytes): return False return all(isinstance(x, int) for x in seq) diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index a9da58cf..956f41d2 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -1621,7 +1621,7 @@ def _set_default_dparams(kwargs): kwargs["dparams"] = dparams -def _process_opened_object(res): +def process_opened_object(res): meta = getattr(res, "schunk", res).meta if "proxy-source" in meta: proxy_src = meta["proxy-source"] @@ -1635,7 +1635,7 @@ def _process_opened_object(res): raise RuntimeError("Could not find the source when opening a Proxy") if "b2o" in meta: - return blosc2._open_b2object(res) + return blosc2.open_b2object(res) if "vlarray" in meta: from blosc2.vlarray import VLArray @@ -1648,7 +1648,7 @@ def _process_opened_object(res): return BatchStore(_from_schunk=getattr(res, "schunk", res)) if isinstance(res, blosc2.NDArray) and "LazyArray" in res.schunk.meta: - return blosc2._open_lazyarray(res) + return blosc2.open_lazyarray(res) else: return res @@ -1781,4 +1781,4 @@ def open( _set_default_dparams(kwargs) res = blosc2_ext.open(urlpath, mode, offset, **kwargs) - return _process_opened_object(res) + return process_opened_object(res) diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 67434a3a..9ecbce58 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -80,7 +80,7 @@ def _string_endswith(a, b): return np.char.endswith(a, b) -def _format_expr_scalar(value): +def format_expr_scalar(value): if isinstance(value, np.generic): value = value.item() if isinstance(value, str | bytes): @@ -708,7 +708,7 @@ def _lookup_value(self, node): # noqa : C901 # handle negative constants like -1 if isinstance(node.op, ast.USub): val = self._lookup_value(node.operand) - if isinstance(val, (int, float)): + if isinstance(val, int | float): return -val # handle + (USub) if needed if isinstance(node.op, ast.UAdd): @@ -775,7 +775,7 @@ def slice_to_chunktuple(s, n): return range(start // n, ceiling(stop, n)) -def _get_selection(ctuple, ptuple, chunks): +def get_selection(ctuple, ptuple, chunks): # we assume that at least one element of chunk intersects with the slice # (as a consequence of only looping over intersecting chunks) # ptuple is global slice, ctuple is chunk coords (in units of chunks) @@ -841,7 +841,7 @@ def _get_selection(ctuple, ptuple, chunks): return out_pselection, pselection, loc_selection -def _get_local_slice(prior_selection, post_selection, chunk_bounds): +def get_local_slice(prior_selection, post_selection, chunk_bounds): chunk_begin, chunk_end = chunk_bounds # +1 for negative steps as have to include start (exclude stop) locbegin = np.hstack( @@ -865,7 +865,7 @@ def _get_local_slice(prior_selection, post_selection, chunk_bounds): return locbegin, locend -def _sliced_chunk_iter(chunks, idx, shape, axis=None, nchunk=False): +def sliced_chunk_iter(chunks, idx, shape, axis=None, nchunk=False): """ If nchunk is True, retrun at iterator over the number of the chunk. """ @@ -939,7 +939,7 @@ def get_intersecting_chunks(idx, shape, chunks, axis=None): return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks # special algorithm to iterate over axis first (adapted from ndindex source) - return _sliced_chunk_iter(chunks, idx, shape, axis) + return sliced_chunk_iter(chunks, idx, shape, axis) def get_chunks_idx(shape, chunks): @@ -966,7 +966,7 @@ def is_inside_ne_evaluate() -> bool: return builtins.any(frame_info.function in {"ne_evaluate"} for frame_info in stack) -def _incomplete_lazyfunc(func) -> None: +def incomplete_lazyfunc(func) -> None: """Decorator for lazy functions with incomplete numexpr/miniexpr coverage. This function will force eager execution when called from ne_evaluate. @@ -1066,7 +1066,7 @@ def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): ) -def _get_chunk_operands(operands, cslice, chunk_operands, shape): +def get_chunk_operands(operands, cslice, chunk_operands, shape): # Get the starts and stops for the slice cslice_shape = tuple(s.stop - s.start for s in cslice) starts = [s.start if s.start is not None else 0 for s in cslice] diff --git a/src/blosc2/_wasm_jit.py b/src/blosc2/wasm_jit.py similarity index 98% rename from src/blosc2/_wasm_jit.py rename to src/blosc2/wasm_jit.py index 54f08410..e8361ec8 100644 --- a/src/blosc2/_wasm_jit.py +++ b/src/blosc2/wasm_jit.py @@ -602,8 +602,8 @@ def init_wasm_jit_helpers() -> bool: from . import blosc2_ext - if not hasattr(blosc2_ext, "_register_wasm_jit_helpers"): - _trace("extension does not expose _register_wasm_jit_helpers") + if not hasattr(blosc2_ext, "register_wasm_jit_helpers"): + _trace("extension does not expose register_wasm_jit_helpers") return False _inject_pyodide_runtime_handles(js) @@ -618,7 +618,7 @@ def init_wasm_jit_helpers() -> bool: instantiate_ptr, free_ptr = helper_ptrs try: - blosc2_ext._register_wasm_jit_helpers(instantiate_ptr, free_ptr) + blosc2_ext.register_wasm_jit_helpers(instantiate_ptr, free_ptr) except Exception as exc: # pragma: no cover - pyodide-specific error path _trace(f"C helper registration failed: {exc}") return False From 46f46ee61ffbd173eaa3fcd865ba1a65f1303b99 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 07:18:24 +0200 Subject: [PATCH 04/13] Preserve legacy LazyExpr persistence behavior and support .b2z operands --- src/blosc2/b2objects.py | 48 +++++++++++++++++++++++++++------- src/blosc2/lazyexpr.py | 42 ++++++++++++++++++++++++++--- tests/ndarray/test_lazyexpr.py | 31 ++++++++++++++++++++++ 3 files changed, 107 insertions(+), 14 deletions(-) diff --git a/src/blosc2/b2objects.py b/src/blosc2/b2objects.py index 5f02119c..f7f518a0 100644 --- a/src/blosc2/b2objects.py +++ b/src/blosc2/b2objects.py @@ -10,6 +10,7 @@ import builtins import inspect import linecache +import pathlib import textwrap from dataclasses import asdict from typing import Any @@ -47,8 +48,11 @@ def encode_operand_reference(obj): return blosc2.Ref.from_object(obj).to_dict() -def decode_operand_reference(payload): - return blosc2.Ref.from_dict(payload).open() +def decode_operand_reference(payload, *, base_path=None): + ref = blosc2.Ref.from_dict(payload) + if ref.kind == "urlpath" and base_path is not None and not pathlib.Path(ref.urlpath).is_absolute(): + return blosc2.open(base_path / ref.urlpath, mode="r") + return ref.open() def encode_b2object_payload(obj) -> dict[str, Any] | None: @@ -98,7 +102,7 @@ def encode_b2object_payload(obj) -> dict[str, Any] | None: return None -def decode_b2object_payload(payload: dict[str, Any]): +def decode_b2object_payload(payload: dict[str, Any], *, carrier_path=None): kind = payload.get("kind") version = payload.get("version") if version != _B2OBJECT_VERSION: @@ -107,24 +111,39 @@ def decode_b2object_payload(payload: dict[str, Any]): ref = blosc2.Ref.from_dict(payload) return ref.open() if kind == "lazyexpr": - return decode_structured_lazyexpr(payload) + return decode_structured_lazyexpr(payload, carrier_path=carrier_path) if kind == "lazyudf": - return decode_structured_lazyudf(payload) + return decode_structured_lazyudf(payload, carrier_path=carrier_path) raise ValueError(f"Unsupported persisted Blosc2 object kind: {kind!r}") -def decode_structured_lazyexpr(payload): +def decode_structured_lazyexpr(payload, *, carrier_path=None): expression = payload.get("expression") if not isinstance(expression, str): raise TypeError("Structured LazyExpr payload requires a string 'expression'") operands_payload = payload.get("operands") if not isinstance(operands_payload, dict): raise TypeError("Structured LazyExpr payload requires a mapping 'operands'") - operands = {key: decode_operand_reference(value) for key, value in operands_payload.items()} + operands = {} + missing_ops = {} + for key, value in operands_payload.items(): + try: + operands[key] = decode_operand_reference(value, base_path=carrier_path) + except FileNotFoundError: + ref = blosc2.Ref.from_dict(value) + if ref.kind == "urlpath": + missing_ops[key] = pathlib.Path(ref.urlpath) + else: + raise + if missing_ops: + exc = blosc2.exceptions.MissingOperands(expression, missing_ops) + exc.expr = expression + exc.missing_ops = missing_ops + raise exc return blosc2.lazyexpr(expression, operands=operands) -def decode_structured_lazyudf(payload): +def decode_structured_lazyudf(payload, *, carrier_path=None): function_kind = payload.get("function_kind") if function_kind != "dsl": raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}") @@ -167,7 +186,8 @@ def decode_structured_lazyudf(payload): func.dsl_source = dsl_source operands = tuple( - decode_operand_reference(operands_payload[f"o{n}"]) for n in range(len(operands_payload)) + decode_operand_reference(operands_payload[f"o{n}"], base_path=carrier_path) + for n in range(len(operands_payload)) ) return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs) @@ -194,4 +214,12 @@ def open_b2object(obj): raise ValueError(f"Unsupported persisted Blosc2 object version: {marker.get('version')!r}") if marker.get("kind") != payload.get("kind"): raise ValueError("Persisted Blosc2 object marker/payload kind mismatch") - return decode_b2object_payload(payload) + carrier_path = None + schunk = getattr(obj, "schunk", obj) + if getattr(schunk, "urlpath", None) is not None: + carrier_path = pathlib.Path(schunk.urlpath).parent + opened = decode_b2object_payload(payload, carrier_path=carrier_path) + if isinstance(opened, blosc2.LazyExpr | blosc2.LazyUDF): + opened.array = obj + opened.schunk = schunk + return opened diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 773f348f..e04ba5e5 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -3718,12 +3718,46 @@ def save(self, urlpath=None, **kwargs): self._to_b2object_carrier(**kwargs) def to_cframe(self) -> bytes: - return self._to_b2object_carrier().to_cframe() + try: + return self._to_b2object_carrier().to_cframe() + except (TypeError, ValueError): + return self.compute().to_cframe() def _to_b2object_carrier(self, **kwargs): - payload = encode_b2object_payload(self) - if payload is None: - raise TypeError("Unsupported persisted Blosc2 object") + expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression + operands_ = self.operands_tosave if hasattr(self, "operands_tosave") else self.operands + validate_expr(expression) + + payload = {"kind": "lazyexpr", "version": 1, "expression": expression, "operands": {}} + carrier_urlpath = kwargs.get("urlpath") + carrier_parent = Path(carrier_urlpath).parent if carrier_urlpath is not None else None + for key, value in operands_.items(): + if isinstance(value, blosc2.C2Array): + payload["operands"][key] = encode_b2object_payload(value) + continue + if isinstance(value, blosc2.Proxy): + value = value._cache + ref = getattr(value, "_blosc2_ref", None) + if isinstance(ref, blosc2.Ref): + payload["operands"][key] = ref.to_dict() + continue + if not hasattr(value, "schunk"): + raise ValueError( + "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" + ) + if value.schunk.urlpath is None: + raise ValueError("To save a LazyArray, all operands must be stored on disk/network") + operand_urlpath = Path(value.schunk.urlpath) + if carrier_parent is not None and not operand_urlpath.is_absolute(): + ref_urlpath = operand_urlpath.as_posix() + elif carrier_parent is not None: + try: + ref_urlpath = operand_urlpath.relative_to(carrier_parent).as_posix() + except ValueError: + ref_urlpath = operand_urlpath.as_posix() + else: + ref_urlpath = operand_urlpath.as_posix() + payload["operands"][key] = {"kind": "urlpath", "version": 1, "urlpath": ref_urlpath} array = make_b2object_carrier( "lazyexpr", self.shape, diff --git a/tests/ndarray/test_lazyexpr.py b/tests/ndarray/test_lazyexpr.py index 6215beb5..dae1ae5c 100644 --- a/tests/ndarray/test_lazyexpr.py +++ b/tests/ndarray/test_lazyexpr.py @@ -1721,6 +1721,37 @@ def test_missing_operator(): blosc2.remove_urlpath("expr.b2nd") +def test_save_dictstore_operands(tmp_path): + store_path = tmp_path / "operands.b2z" + ext_a = tmp_path / "a.b2nd" + ext_b = tmp_path / "b.b2nd" + expr_path = tmp_path / "expr.b2nd" + expected = np.arange(5, dtype=np.int64) * 3 + + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(ext_a), mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=str(ext_b), mode="w") + with blosc2.DictStore(str(store_path), mode="w", threshold=None) as dstore: + dstore["/a"] = a + dstore["/b"] = b + + with blosc2.DictStore(str(store_path), mode="r") as dstore: + a = dstore["/a"] + b = dstore["/b"] + expr = blosc2.lazyexpr("a + b") + expr.save(expr_path) + + carrier = blosc2.open(expr_path, mode="r").array + assert carrier.schunk.vlmeta["b2o"]["operands"] == { + "a": {"kind": "dictstore_key", "version": 1, "urlpath": str(store_path), "key": "/a"}, + "b": {"kind": "dictstore_key", "version": 1, "urlpath": str(store_path), "key": "/b"}, + } + + restored = blosc2.open(expr_path) + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], expected) + + # Test the chaining of multiple lazy expressions def test_chain_expressions(): N = 1_000 From 3b8409b1153705a689231c634277579a44b43bc7 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 07:25:54 +0200 Subject: [PATCH 05/13] Remove dsl_source and keep udf_source as SSoT for LazyUDF serialization --- src/blosc2/b2objects.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/blosc2/b2objects.py b/src/blosc2/b2objects.py index f7f518a0..411b19f9 100644 --- a/src/blosc2/b2objects.py +++ b/src/blosc2/b2objects.py @@ -93,7 +93,6 @@ def encode_b2object_payload(obj) -> dict[str, Any] | None: "dsl_version": _B2OBJECT_DSL_VERSION, "name": udf_name, "udf_source": udf_source, - "dsl_source": obj.func.dsl_source, "dtype": np.dtype(obj.dtype).str, "shape": list(obj.shape), "operands": {f"o{i}": encode_operand_reference(value) for i, value in enumerate(obj.inputs)}, @@ -181,9 +180,6 @@ def decode_structured_lazyudf(payload, *, carrier_path=None): func = local_ns[name] if not isinstance(func, DSLKernel): func = DSLKernel(func) - dsl_source = payload.get("dsl_source") - if dsl_source is not None and func.dsl_source is None: - func.dsl_source = dsl_source operands = tuple( decode_operand_reference(operands_payload[f"o{n}"], base_path=carrier_path) From c6fa4e44f8fca827d389a9bc548dfac4ba68f938 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 07:48:28 +0200 Subject: [PATCH 06/13] Add b2o-backed persistence for DSL LazyUDF objects --- src/blosc2/b2objects.py | 44 ++++++--- src/blosc2/lazyexpr.py | 124 ++++++++++++++++--------- tests/data/legacy_lazyudf_v1/a.b2nd | Bin 0 -> 293 bytes tests/data/legacy_lazyudf_v1/b.b2nd | Bin 0 -> 293 bytes tests/data/legacy_lazyudf_v1/expr.b2nd | Bin 0 -> 434 bytes tests/ndarray/test_dsl_kernels.py | 30 ++++++ tests/ndarray/test_lazyexpr.py | 8 +- tests/test_b2objects.py | 54 +++++++++++ 8 files changed, 196 insertions(+), 64 deletions(-) create mode 100644 tests/data/legacy_lazyudf_v1/a.b2nd create mode 100644 tests/data/legacy_lazyudf_v1/b.b2nd create mode 100644 tests/data/legacy_lazyudf_v1/expr.b2nd diff --git a/src/blosc2/b2objects.py b/src/blosc2/b2objects.py index 411b19f9..a6116002 100644 --- a/src/blosc2/b2objects.py +++ b/src/blosc2/b2objects.py @@ -49,9 +49,14 @@ def encode_operand_reference(obj): def decode_operand_reference(payload, *, base_path=None): + if ( + payload.get("kind") in {"urlpath", "dictstore_key"} + and base_path is not None + and not pathlib.Path(payload["urlpath"]).is_absolute() + ): + payload = dict(payload) + payload["urlpath"] = (base_path / payload["urlpath"]).as_posix() ref = blosc2.Ref.from_dict(payload) - if ref.kind == "urlpath" and base_path is not None and not pathlib.Path(ref.urlpath).is_absolute(): - return blosc2.open(base_path / ref.urlpath, mode="r") return ref.open() @@ -123,23 +128,28 @@ def decode_structured_lazyexpr(payload, *, carrier_path=None): operands_payload = payload.get("operands") if not isinstance(operands_payload, dict): raise TypeError("Structured LazyExpr payload requires a mapping 'operands'") + operands, missing_ops = decode_operand_mapping(operands_payload, base_path=carrier_path) + if missing_ops: + exc = blosc2.exceptions.MissingOperands(expression, missing_ops) + exc.expr = expression + exc.missing_ops = missing_ops + raise exc + return blosc2.lazyexpr(expression, operands=operands) + + +def decode_operand_mapping(operands_payload, *, base_path=None): operands = {} missing_ops = {} for key, value in operands_payload.items(): try: - operands[key] = decode_operand_reference(value, base_path=carrier_path) + operands[key] = decode_operand_reference(value, base_path=base_path) except FileNotFoundError: ref = blosc2.Ref.from_dict(value) - if ref.kind == "urlpath": + if ref.kind in {"urlpath", "dictstore_key"}: missing_ops[key] = pathlib.Path(ref.urlpath) else: raise - if missing_ops: - exc = blosc2.exceptions.MissingOperands(expression, missing_ops) - exc.expr = expression - exc.missing_ops = missing_ops - raise exc - return blosc2.lazyexpr(expression, operands=operands) + return operands, missing_ops def decode_structured_lazyudf(payload, *, carrier_path=None): @@ -180,12 +190,16 @@ def decode_structured_lazyudf(payload, *, carrier_path=None): func = local_ns[name] if not isinstance(func, DSLKernel): func = DSLKernel(func) - - operands = tuple( - decode_operand_reference(operands_payload[f"o{n}"], base_path=carrier_path) - for n in range(len(operands_payload)) + ordered_operands_payload = {f"o{n}": operands_payload[f"o{n}"] for n in range(len(operands_payload))} + operands, missing_ops = decode_operand_mapping(ordered_operands_payload, base_path=carrier_path) + if missing_ops: + exc = blosc2.exceptions.MissingOperands(name, missing_ops) + exc.expr = name + exc.missing_ops = missing_ops + raise exc + return blosc2.lazyudf( + func, tuple(operands.values()), dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs ) - return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs) def read_b2object_marker(obj) -> dict[str, Any] | None: diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index e04ba5e5..ae6c4413 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -3718,10 +3718,7 @@ def save(self, urlpath=None, **kwargs): self._to_b2object_carrier(**kwargs) def to_cframe(self) -> bytes: - try: - return self._to_b2object_carrier().to_cframe() - except (TypeError, ValueError): - return self.compute().to_cframe() + return self._to_b2object_carrier().to_cframe() def _to_b2object_carrier(self, **kwargs): expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression @@ -4079,50 +4076,91 @@ def save(self, urlpath=None, **kwargs): if urlpath is None: raise ValueError("To save a LazyArray you must provide an urlpath") - meta = kwargs.get("meta", {}) - meta["LazyArray"] = LazyArrayEnum.UDF.value kwargs["urlpath"] = urlpath - kwargs["meta"] = meta kwargs["mode"] = "w" # always overwrite the file in urlpath + try: + self._to_b2object_carrier(**kwargs) + except (TypeError, ValueError): + meta = kwargs.get("meta", {}) + meta["LazyArray"] = LazyArrayEnum.UDF.value + kwargs["meta"] = meta - # Create an empty array; useful for providing the shape and dtype of the outcome - array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) + # Create an empty array; useful for providing the shape and dtype of the outcome + array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) - # Save the expression and operands in the metadata - operands = {} - operands_ = self.inputs_dict - for i, (_key, value) in enumerate(operands_.items()): - pos_key = f"o{i}" # always use positional keys for consistent loading - if isinstance(value, blosc2.C2Array): - operands[pos_key] = { - "path": str(value.path), - "urlbase": value.urlbase, - } + # Save the expression and operands in the metadata + operands = {} + operands_ = self.inputs_dict + for i, (_key, value) in enumerate(operands_.items()): + pos_key = f"o{i}" # always use positional keys for consistent loading + if isinstance(value, blosc2.C2Array): + operands[pos_key] = { + "path": str(value.path), + "urlbase": value.urlbase, + } + continue + if isinstance(value, blosc2.Proxy): + # Take the required info from the Proxy._cache container + value = value._cache + if not hasattr(value, "schunk"): + raise ValueError( + "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" + ) from None + if value.schunk.urlpath is None: + raise ValueError( + "To save a LazyArray, all operands must be stored on disk/network" + ) from None + operands[pos_key] = value.schunk.urlpath + udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func + udf_name = getattr(udf_func, "__name__", self.func.__name__) + try: + udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip() + except Exception: + udf_source = None + meta = { + "UDF": udf_source, + "operands": operands, + "name": udf_name, + } + if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None: + meta["dsl_source"] = self.func.dsl_source + array.schunk.vlmeta["_LazyArray"] = meta + + def to_cframe(self) -> bytes: + return self._to_b2object_carrier().to_cframe() + + def _to_b2object_carrier(self, **kwargs): + payload = encode_b2object_payload(self) + if payload is None: + raise TypeError("Persistent Blosc2 object payload is not supported for this LazyUDF") + + carrier_urlpath = kwargs.get("urlpath") + carrier_parent = Path(carrier_urlpath).parent if carrier_urlpath is not None else None + for operand_payload in payload["operands"].values(): + if operand_payload["kind"] not in {"urlpath", "dictstore_key"}: continue - if isinstance(value, blosc2.Proxy): - # Take the required info from the Proxy._cache container - value = value._cache - if not hasattr(value, "schunk"): - raise ValueError( - "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" - ) - if value.schunk.urlpath is None: - raise ValueError("To save a LazyArray, all operands must be stored on disk/network") - operands[pos_key] = value.schunk.urlpath - udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func - udf_name = getattr(udf_func, "__name__", self.func.__name__) - try: - udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip() - except Exception: - udf_source = None - meta = { - "UDF": udf_source, - "operands": operands, - "name": udf_name, - } - if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None: - meta["dsl_source"] = self.func.dsl_source - array.schunk.vlmeta["_LazyArray"] = meta + operand_urlpath = Path(operand_payload["urlpath"]) + if carrier_parent is not None and not operand_urlpath.is_absolute(): + ref_urlpath = operand_urlpath.as_posix() + elif carrier_parent is not None: + try: + ref_urlpath = operand_urlpath.relative_to(carrier_parent).as_posix() + except ValueError: + ref_urlpath = operand_urlpath.as_posix() + else: + ref_urlpath = operand_urlpath.as_posix() + operand_payload["urlpath"] = ref_urlpath + + array = make_b2object_carrier( + "lazyudf", + self.shape, + self.dtype, + chunks=self.chunks, + blocks=self.blocks, + **kwargs, + ) + write_b2object_payload(array, payload) + return array def _numpy_eval_expr(expression, operands, prefer_blosc=False): diff --git a/tests/data/legacy_lazyudf_v1/a.b2nd b/tests/data/legacy_lazyudf_v1/a.b2nd new file mode 100644 index 0000000000000000000000000000000000000000..c60c13bdcbe5f324c682ede84e071e42d6bda1aa GIT binary patch literal 293 zcmbQYBFQMNC^0vc;SvJ_!=&>-0tgsYmk2S0GF^u77&KrE50D}b5TOAkE;4Wc(V-h` zV1a0TcvbkXc~B2xB1VDU3kJvfX1~ RxWmA34k)aA10e@x0RX%1CW8O~ literal 0 HcmV?d00001 diff --git a/tests/data/legacy_lazyudf_v1/b.b2nd b/tests/data/legacy_lazyudf_v1/b.b2nd new file mode 100644 index 0000000000000000000000000000000000000000..8bcdbccfd0177ab931cf2a4f74213449ecee3040 GIT binary patch literal 293 zcmbQYBFQMNC^0vc;SvJ_!=&>-0tgsYmk2S0GF^u77&KrE50D}b5TOAkE;4Wc(V-h` zV1$$+vU6|%B|#Wu78o$X7zla_BhayI_ZS%N PFfg103M=10$U#{EZIUI# literal 0 HcmV?d00001 diff --git a/tests/data/legacy_lazyudf_v1/expr.b2nd b/tests/data/legacy_lazyudf_v1/expr.b2nd new file mode 100644 index 0000000000000000000000000000000000000000..6a4dd9fad2370fba198ea63fff2f6e9f0bed22ce GIT binary patch literal 434 zcmbQYBFQMNC^0vc;SvJ_!}9Y$0tgs4EfHb}Wx5REF=)UTU_~4tl^S5;A_E5y9lpT^ zR>;T-RnNdM`7DFNJqD&FNk(}oAeB`seG;oG9gB(*D?!|acNmzC0coY_42%k?lCZ~ibt_f<_-fR$fVV* zjIE4kfea8{4b{bfd^zTU5v9&&NHD0@39LsUl(U&NF5Vm0DfuMKHyah@dR1$aI_?6)OdEGqg z`tbVlckABmyQ_5T|9uIj_e|^;j 0 - arr = blosc2.ndarray_from_cframe(cframe) - assert arr.shape == expr.shape - assert arr.dtype == expr.dtype - assert np.allclose(arr[:], expr[:]) + with pytest.raises(ValueError, match="stored on disk/network"): + expr.to_cframe() # Test for the bug where multiplying two complex lazy expressions would fail with: diff --git a/tests/test_b2objects.py b/tests/test_b2objects.py index 11870fa5..2653ae4a 100644 --- a/tests/test_b2objects.py +++ b/tests/test_b2objects.py @@ -15,6 +15,11 @@ import blosc2.c2array as blosc2_c2array +@blosc2.dsl_kernel +def kernel_add_square(x, y): + return x * x + y * y + 2 * x * y + + def _make_c2array( monkeypatch, path="@public/examples/ds-1d.b2nd", @@ -128,3 +133,52 @@ def test_legacy_lazyexpr_open_backward_compat(): assert isinstance(restored, blosc2.LazyExpr) np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3) + + +def test_legacy_lazyudf_open_backward_compat(): + fixture = Path(__file__).parent / "data" / "legacy_lazyudf_v1" / "expr.b2nd" + + restored = blosc2.open(fixture, mode="r") + + assert isinstance(restored, blosc2.LazyUDF) + np.testing.assert_allclose(restored.compute()[:], (np.arange(5, dtype=np.float64) * 3) ** 2) + + +def test_lazyudf_from_cframe_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.float64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.float64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64) + carrier = blosc2.ndarray_from_cframe(expr.to_cframe()) + + assert carrier.schunk.meta["b2o"] == {"kind": "lazyudf", "version": 1} + payload = carrier.schunk.vlmeta["b2o"] + assert payload["kind"] == "lazyudf" + assert payload["version"] == 1 + assert payload["function_kind"] == "dsl" + assert payload["dsl_version"] == 1 + assert payload["name"] == "kernel_add_square" + assert "kernel_add_square" in payload["udf_source"] + assert payload["dtype"] == np.dtype(np.float64).str + assert payload["shape"] == [5] + assert payload["operands"] == { + "o0": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "a.b2nd")}, + "o1": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "b.b2nd")}, + } + + restored = blosc2.from_cframe(expr.to_cframe()) + + assert isinstance(restored, blosc2.LazyUDF) + np.testing.assert_allclose(restored[:], (np.arange(5, dtype=np.float64) * 3) ** 2) + + +def test_lazyudf_open_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.float64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.float64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64) + urlpath = tmp_path / "expr.b2nd" + + expr.save(urlpath) + restored = blosc2.open(urlpath, mode="r") + + assert isinstance(restored, blosc2.LazyUDF) + np.testing.assert_allclose(restored[:], (np.arange(5, dtype=np.float64) * 3) ** 2) From dcf76f644bf58435e1ba68d1ccd6032bd3bb62fd Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 08:04:50 +0200 Subject: [PATCH 07/13] Avoid eager DictStore rehydration and new b2o bundle demo --- examples/embeded-expr-udf-b2z.py | 76 ++++++++++++++++++++++++++++++++ src/blosc2/dict_store.py | 41 ++++++++++------- tests/test_b2objects.py | 25 +++++++++++ 3 files changed, 126 insertions(+), 16 deletions(-) create mode 100644 examples/embeded-expr-udf-b2z.py diff --git a/examples/embeded-expr-udf-b2z.py b/examples/embeded-expr-udf-b2z.py new file mode 100644 index 00000000..8111dd5a --- /dev/null +++ b/examples/embeded-expr-udf-b2z.py @@ -0,0 +1,76 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +import numpy as np + +import blosc2 +from blosc2 import where + + +def show(label, value): + print(f"{label}: {value}") + + +@blosc2.dsl_kernel +def masked_energy(a, b, mask): + return where(mask > 0, a * a + 2 * b, 0.0) + + +bundle_path = "example_b2o_bundle.b2z" +blosc2.remove_urlpath(bundle_path) + +# Build a portable bundle with ordinary arrays plus persisted lazy recipes. +with blosc2.DictStore(bundle_path, mode="w", threshold=1) as store: + store["/data/a"] = np.linspace(0.0, 1.0, 10, dtype=np.float32) + store["/data/b"] = np.linspace(1.0, 2.0, 10, dtype=np.float32) + store["/data/mask"] = np.asarray([0, 1, 1, 0, 1, 0, 1, 1, 0, 1], dtype=np.int8) + + # Reopen the array members through the store so operand refs point back to + # the .b2z container via dictstore_key payloads. + a = store["/data/a"] + b = store["/data/b"] + mask = store["/data/mask"] + + expr = blosc2.lazyexpr("(a - b) / (a + b + 1e-6)", operands={"a": a, "b": b}) + udf = blosc2.lazyudf(masked_energy, (a, b, mask), dtype=np.float32, shape=a.shape) + + # DictStore currently stores array-like external leaves, so persist the + # logical lazy objects through their b2o NDArray carriers. + store["/recipes/expr"] = blosc2.ndarray_from_cframe(expr.to_cframe()) + store["/recipes/udf"] = blosc2.ndarray_from_cframe(udf.to_cframe()) + +show("Bundle created", bundle_path) + +# Reopen the bundle read-only. The persisted LazyExpr and LazyUDF can be +# evaluated directly without re-saving, rebuilding, or re-deploying the .b2z. +with blosc2.open(bundle_path, mode="r") as store: + show("Read-only keys", sorted(store.keys())) + + expr = store["/recipes/expr"] + udf = store["/recipes/udf"] + + expr_result = expr.compute() + udf_result = udf.compute() + + show("Reopened expr type", type(expr).__name__) + show("Reopened udf type", type(udf).__name__) + show("Expr operand refs", expr.array.schunk.vlmeta["b2o"]["operands"]) + show("UDF operand refs", udf.array.schunk.vlmeta["b2o"]["operands"]) + show("Expr values", np.round(expr_result[:], 4)) + show("UDF values", udf_result[:]) + + expected_expr = (store["/data/a"][:] - store["/data/b"][:]) / ( + store["/data/a"][:] + store["/data/b"][:] + 1e-6 + ) + expected_udf = np.where( + store["/data/mask"][:] > 0, + store["/data/a"][:] ** 2 + 2 * store["/data/b"][:], + 0.0, + ).astype(np.float32) + np.testing.assert_allclose(expr_result[:], expected_expr, rtol=1e-6, atol=1e-6) + np.testing.assert_allclose(udf_result[:], expected_udf, rtol=1e-6, atol=1e-6) + show("Read-only evaluation", "ok") diff --git a/src/blosc2/dict_store.py b/src/blosc2/dict_store.py index d88ce4a4..0ffcc8d8 100644 --- a/src/blosc2/dict_store.py +++ b/src/blosc2/dict_store.py @@ -227,30 +227,39 @@ def _opened_external_kind( rel_path: str, ) -> str | None: """Return the supported external leaf kind for an already opened object.""" - processed = process_opened_object(opened) - if isinstance(processed, blosc2.BatchStore): - kind = "batchstore" - elif isinstance(processed, blosc2.VLArray): - kind = "vlarray" - elif isinstance(processed, blosc2.NDArray): + meta = getattr(opened, "schunk", opened).meta + if "b2o" in meta and isinstance(opened, blosc2.NDArray): + # Keep b2o carriers as NDArray external leaves during discovery. + # Rehydrating them here can recurse when a lazy recipe points back + # into the same DictStore via dictstore_key refs. kind = "ndarray" - elif isinstance(processed, SChunk): - kind = "schunk" + processed_name = type(opened).__name__ else: - warnings.warn( - f"Ignoring unsupported Blosc2 object at '{rel_path}' during DictStore discovery: " - f"{type(processed).__name__}", - UserWarning, - stacklevel=2, - ) - return None + processed = process_opened_object(opened) + processed_name = type(processed).__name__ + if isinstance(processed, blosc2.BatchStore): + kind = "batchstore" + elif isinstance(processed, blosc2.VLArray): + kind = "vlarray" + elif isinstance(processed, blosc2.NDArray): + kind = "ndarray" + elif isinstance(processed, SChunk): + kind = "schunk" + else: + warnings.warn( + f"Ignoring unsupported Blosc2 object at '{rel_path}' during DictStore discovery: " + f"{processed_name}", + UserWarning, + stacklevel=2, + ) + return None expected_ext = cls._expected_ext_from_kind(kind) found_ext = os.path.splitext(rel_path)[1] if found_ext != expected_ext: warnings.warn( f"External leaf '{rel_path}' uses extension '{found_ext}' but metadata resolves to " - f"{type(processed).__name__}; expected '{expected_ext}'.", + f"{processed_name}; expected '{expected_ext}'.", UserWarning, stacklevel=2, ) diff --git a/tests/test_b2objects.py b/tests/test_b2objects.py index 2653ae4a..a8a142a5 100644 --- a/tests/test_b2objects.py +++ b/tests/test_b2objects.py @@ -182,3 +182,28 @@ def test_lazyudf_open_roundtrip(tmp_path): assert isinstance(restored, blosc2.LazyUDF) np.testing.assert_allclose(restored[:], (np.arange(5, dtype=np.float64) * 3) ** 2) + + +def test_b2z_bundle_with_lazy_recipes_opens_read_only(tmp_path): + bundle_path = tmp_path / "bundle.b2z" + + with blosc2.DictStore(str(bundle_path), mode="w", threshold=1) as store: + store["/data/a"] = np.arange(5, dtype=np.float64) + store["/data/b"] = np.arange(5, dtype=np.float64) * 2 + + a = store["/data/a"] + b = store["/data/b"] + expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b}) + udf = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64, shape=a.shape) + + store["/recipes/expr"] = blosc2.ndarray_from_cframe(expr.to_cframe()) + store["/recipes/udf"] = blosc2.ndarray_from_cframe(udf.to_cframe()) + + with blosc2.open(str(bundle_path), mode="r") as store: + restored_expr = store["/recipes/expr"] + restored_udf = store["/recipes/udf"] + + assert isinstance(restored_expr, blosc2.LazyExpr) + assert isinstance(restored_udf, blosc2.LazyUDF) + np.testing.assert_allclose(restored_expr.compute()[:], np.arange(5, dtype=np.float64) * 3) + np.testing.assert_allclose(restored_udf.compute()[:], (np.arange(5, dtype=np.float64) * 3) ** 2) From eeb865080e8622d06f44bf075c0424ed7e347847 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 08:13:46 +0200 Subject: [PATCH 08/13] Allow reopened proxies to fill caches after read-only open --- src/blosc2/proxy.py | 30 ++++++++++++++++++++++++++++++ src/blosc2/schunk.py | 3 +++ tests/ndarray/test_proxy.py | 11 +++++++++++ 3 files changed, 44 insertions(+) diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index ad64a4d4..fdd04fe2 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -192,6 +192,11 @@ class Proxy(blosc2.Operand): This can be used to cache chunks of a regular data container which follows the :ref:`ProxySource` or :ref:`ProxyNDSource` interfaces. + + If a persisted proxy is reopened in read mode, lazy cache fills still need to + update the local cache. In that case the cache file is reopened internally + in append mode on demand when :meth:`fetch`, :meth:`afetch`, or + :meth:`__getitem__` needs to populate missing chunks. """ def __init__( @@ -209,6 +214,9 @@ def __init__( mode: str, optional "a" means read/write (create if it doesn't exist); "w" means create (overwrite if it exists). Default is "a". + When a persisted proxy is reopened through :func:`blosc2.open` with + ``mode="r"``, later lazy cache fills may reopen the cache file + internally in append mode so missing chunks can be written locally. kwargs: dict, optional Keyword arguments supported: @@ -259,11 +267,21 @@ def __init__( ) self._cache.fill_special(self.src.nbytes // self.src.typesize, blosc2.SpecialValue.UNINIT) self._schunk_cache = getattr(self._cache, "schunk", self._cache) + if self.urlpath is None: + self.urlpath = getattr(self._schunk_cache, "urlpath", None) vlmeta = kwargs.get("vlmeta") if vlmeta: for key in vlmeta: self._schunk_cache.vlmeta[key] = vlmeta[key] + def _ensure_writable_cache(self) -> None: + """Reopen a persisted cache writable when lazy fetch needs to fill chunks.""" + cache_urlpath = getattr(self._schunk_cache, "urlpath", None) + if cache_urlpath is None or getattr(self._schunk_cache, "mode", None) != "r": + return + self._cache = blosc2.blosc2_ext.open(cache_urlpath, "a", 0) + self._schunk_cache = getattr(self._cache, "schunk", self._cache) + def fetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray | blosc2.schunk.SChunk: """ Get the container used as cache with the requested data updated. @@ -279,6 +297,13 @@ def fetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray | blosc out: :ref:`NDArray` or :ref:`SChunk` The local container used to cache the already requested data. + Notes + ----- + If the proxy cache was reopened read-only from disk, this method may + reopen that cache internally in append mode before filling missing + chunks. This preserves the logical read-only open of the proxy source + while still allowing the local cache to be populated lazily. + Examples -------- >>> import numpy as np @@ -292,6 +317,7 @@ def fetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray | blosc [2 3] [4 5]] """ + self._ensure_writable_cache() if item == (): # Full realization for info in self._schunk_cache.iterchunks_info(): @@ -327,6 +353,9 @@ async def afetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray ----- This method is only available if the :ref:`ProxySource` or :ref:`ProxyNDSource` have an async `aget_chunk` method. + If the proxy cache was reopened read-only from disk, this method may + reopen that cache internally in append mode before filling missing + chunks. Examples -------- @@ -384,6 +413,7 @@ async def afetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray """ if not callable(getattr(self.src, "aget_chunk", None)): raise NotImplementedError("afetch is only available if the source has an aget_chunk method") + self._ensure_writable_cache() if item == (): # Full realization for info in self._schunk_cache.iterchunks_info(): diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index 956f41d2..fbb54834 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -1721,6 +1721,9 @@ def open( it will return the Python-Blosc2 container used to cache the data which can be a :ref:`SChunk` or a :ref:`NDArray` and may not have all the data initialized (e.g. if the user has not accessed to it yet). + When such a persisted proxy is opened with ``mode="r"``, later lazy cache + fills may reopen the local cache internally in append mode so missing + chunks can still be written. * When opening a :ref:`LazyExpr` keep in mind the note above regarding operands. diff --git a/tests/ndarray/test_proxy.py b/tests/ndarray/test_proxy.py index 15648e99..1ed33f21 100644 --- a/tests/ndarray/test_proxy.py +++ b/tests/ndarray/test_proxy.py @@ -105,6 +105,17 @@ def test_open(urlpath, shape, chunks, blocks, slices, dtype): blosc2.remove_urlpath(proxy_urlpath) +def test_open_read_mode_allows_proxy_cache_fill(tmp_path): + src_urlpath = str(tmp_path / "src.b2nd") + proxy_urlpath = str(tmp_path / "proxy.b2nd") + + a = blosc2.asarray(np.arange(25, dtype=np.int64).reshape(5, 5), urlpath=src_urlpath, mode="w") + _ = blosc2.Proxy(a, urlpath=proxy_urlpath, mode="w") + + proxy = blosc2.open(proxy_urlpath, mode="r") + np.testing.assert_array_equal(proxy[:], a[:]) + + # Test the ProxyNDSources interface @pytest.mark.parametrize( ("shape", "chunks", "blocks"), From 3a572963e1a0f49b9b9d1f6bffeb7e09df8c96c5 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 09:27:16 +0200 Subject: [PATCH 09/13] Use default reopen mode for persisted lazyexpr urlpath operands --- src/blosc2/proxy.py | 28 ---------------------------- src/blosc2/ref.py | 2 +- src/blosc2/schunk.py | 3 --- tests/ndarray/test_lazyexpr.py | 16 ++++++++++++++++ tests/ndarray/test_proxy.py | 11 ----------- 5 files changed, 17 insertions(+), 43 deletions(-) diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index fdd04fe2..8b946428 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -192,11 +192,6 @@ class Proxy(blosc2.Operand): This can be used to cache chunks of a regular data container which follows the :ref:`ProxySource` or :ref:`ProxyNDSource` interfaces. - - If a persisted proxy is reopened in read mode, lazy cache fills still need to - update the local cache. In that case the cache file is reopened internally - in append mode on demand when :meth:`fetch`, :meth:`afetch`, or - :meth:`__getitem__` needs to populate missing chunks. """ def __init__( @@ -214,9 +209,6 @@ def __init__( mode: str, optional "a" means read/write (create if it doesn't exist); "w" means create (overwrite if it exists). Default is "a". - When a persisted proxy is reopened through :func:`blosc2.open` with - ``mode="r"``, later lazy cache fills may reopen the cache file - internally in append mode so missing chunks can be written locally. kwargs: dict, optional Keyword arguments supported: @@ -274,14 +266,6 @@ def __init__( for key in vlmeta: self._schunk_cache.vlmeta[key] = vlmeta[key] - def _ensure_writable_cache(self) -> None: - """Reopen a persisted cache writable when lazy fetch needs to fill chunks.""" - cache_urlpath = getattr(self._schunk_cache, "urlpath", None) - if cache_urlpath is None or getattr(self._schunk_cache, "mode", None) != "r": - return - self._cache = blosc2.blosc2_ext.open(cache_urlpath, "a", 0) - self._schunk_cache = getattr(self._cache, "schunk", self._cache) - def fetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray | blosc2.schunk.SChunk: """ Get the container used as cache with the requested data updated. @@ -297,13 +281,6 @@ def fetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray | blosc out: :ref:`NDArray` or :ref:`SChunk` The local container used to cache the already requested data. - Notes - ----- - If the proxy cache was reopened read-only from disk, this method may - reopen that cache internally in append mode before filling missing - chunks. This preserves the logical read-only open of the proxy source - while still allowing the local cache to be populated lazily. - Examples -------- >>> import numpy as np @@ -317,7 +294,6 @@ def fetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray | blosc [2 3] [4 5]] """ - self._ensure_writable_cache() if item == (): # Full realization for info in self._schunk_cache.iterchunks_info(): @@ -353,9 +329,6 @@ async def afetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray ----- This method is only available if the :ref:`ProxySource` or :ref:`ProxyNDSource` have an async `aget_chunk` method. - If the proxy cache was reopened read-only from disk, this method may - reopen that cache internally in append mode before filling missing - chunks. Examples -------- @@ -413,7 +386,6 @@ async def afetch(self, item: slice | list[slice] | None = ()) -> blosc2.NDArray """ if not callable(getattr(self.src, "aget_chunk", None)): raise NotImplementedError("afetch is only available if the source has an aget_chunk method") - self._ensure_writable_cache() if item == (): # Full realization for info in self._schunk_cache.iterchunks_info(): diff --git a/src/blosc2/ref.py b/src/blosc2/ref.py index 9534d271..c0acca8e 100644 --- a/src/blosc2/ref.py +++ b/src/blosc2/ref.py @@ -118,7 +118,7 @@ def open(self): import blosc2 if self.kind == "urlpath": - return blosc2.open(self.urlpath, mode="r") + return blosc2.open(self.urlpath) if self.kind == "dictstore_key": return blosc2.DictStore(self.urlpath, mode="r")[self.key] if self.kind == "c2array": diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index fbb54834..956f41d2 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -1721,9 +1721,6 @@ def open( it will return the Python-Blosc2 container used to cache the data which can be a :ref:`SChunk` or a :ref:`NDArray` and may not have all the data initialized (e.g. if the user has not accessed to it yet). - When such a persisted proxy is opened with ``mode="r"``, later lazy cache - fills may reopen the local cache internally in append mode so missing - chunks can still be written. * When opening a :ref:`LazyExpr` keep in mind the note above regarding operands. diff --git a/tests/ndarray/test_lazyexpr.py b/tests/ndarray/test_lazyexpr.py index a48769a3..ca4fc254 100644 --- a/tests/ndarray/test_lazyexpr.py +++ b/tests/ndarray/test_lazyexpr.py @@ -1752,6 +1752,22 @@ def test_save_dictstore_operands(tmp_path): np.testing.assert_array_equal(restored[:], expected) +def test_save_proxy_operands_reopen_default_mode(tmp_path): + src_path = tmp_path / "src.b2nd" + proxy_path = tmp_path / "proxy.b2nd" + expr_path = tmp_path / "expr.b2nd" + + src = blosc2.asarray(np.arange(10, dtype=np.int64), urlpath=str(src_path), mode="w") + proxy = blosc2.Proxy(src, urlpath=str(proxy_path), mode="w") + expr = proxy + proxy + expr.save(str(expr_path)) + + restored = blosc2.open(str(expr_path)) + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(10, dtype=np.int64) * 2) + + # Test the chaining of multiple lazy expressions def test_chain_expressions(): N = 1_000 diff --git a/tests/ndarray/test_proxy.py b/tests/ndarray/test_proxy.py index 1ed33f21..15648e99 100644 --- a/tests/ndarray/test_proxy.py +++ b/tests/ndarray/test_proxy.py @@ -105,17 +105,6 @@ def test_open(urlpath, shape, chunks, blocks, slices, dtype): blosc2.remove_urlpath(proxy_urlpath) -def test_open_read_mode_allows_proxy_cache_fill(tmp_path): - src_urlpath = str(tmp_path / "src.b2nd") - proxy_urlpath = str(tmp_path / "proxy.b2nd") - - a = blosc2.asarray(np.arange(25, dtype=np.int64).reshape(5, 5), urlpath=src_urlpath, mode="w") - _ = blosc2.Proxy(a, urlpath=proxy_urlpath, mode="w") - - proxy = blosc2.open(proxy_urlpath, mode="r") - np.testing.assert_array_equal(proxy[:], a[:]) - - # Test the ProxyNDSources interface @pytest.mark.parametrize( ("shape", "chunks", "blocks"), From 660a165b8bba8bfb4423eb7be1d94827c4650e18 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 09:37:29 +0200 Subject: [PATCH 10/13] Update DictStore test for discovered LazyExpr leaves --- tests/test_dict_store.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_dict_store.py b/tests/test_dict_store.py index 337ace30..4347eaf4 100644 --- a/tests/test_dict_store.py +++ b/tests/test_dict_store.py @@ -401,7 +401,7 @@ def test_metadata_discovery_reopens_renamed_external_vlarray(storage_type, tmp_p assert value.vlmeta["description"] == "Renamed VLArray" -def test_metadata_discovery_warns_and_skips_unsupported_blosc2_leaf(tmp_path): +def test_metadata_discovery_reopens_lazyexpr_leaf(tmp_path): path = tmp_path / "test_unsupported_lazyexpr.b2d" with DictStore(str(path), mode="w") as dstore: @@ -413,13 +413,13 @@ def test_metadata_discovery_warns_and_skips_unsupported_blosc2_leaf(tmp_path): expr_path = path / "unsupported_lazyexpr.b2nd" expr.save(str(expr_path)) - with pytest.warns( - UserWarning, match=r"Ignoring unsupported Blosc2 object.*unsupported_lazyexpr\.b2nd.*LazyExpr" - ): - dstore_read = DictStore(str(path), mode="r") + dstore_read = DictStore(str(path), mode="r") with dstore_read: - assert "/unsupported_lazyexpr" not in dstore_read + assert "/unsupported_lazyexpr" in dstore_read assert "/embedded" in dstore_read + value = dstore_read["/unsupported_lazyexpr"] + assert isinstance(value, blosc2.LazyExpr) + np.testing.assert_array_equal(value[:], np.arange(5) * 2) def _digest_value(value): From 663cfe11ff0f18030df226c54657fac100254f93 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 10:00:31 +0200 Subject: [PATCH 11/13] Add vlmeta support to LazyArray objects --- doc/reference/lazyarray.rst | 11 ++++- src/blosc2/b2objects.py | 13 ++++++ src/blosc2/lazyexpr.py | 73 +++++++++++++++++++++++++++++++++- tests/ndarray/test_lazyexpr.py | 23 +++++++++++ tests/ndarray/test_lazyudf.py | 17 ++++++++ 5 files changed, 135 insertions(+), 2 deletions(-) diff --git a/doc/reference/lazyarray.rst b/doc/reference/lazyarray.rst index 9be693c7..1080a62b 100644 --- a/doc/reference/lazyarray.rst +++ b/doc/reference/lazyarray.rst @@ -14,6 +14,11 @@ You can get an object following the LazyArray API with any of the following ways The LazyArray object is a thin wrapper around the expression or user-defined function that allows for lazy computation. This means that the expression is not computed until the ``compute`` or ``__getitem__`` methods are called. The ``compute`` method will return a new NDArray object with the result of the expression evaluation. The ``__getitem__`` method will return an NumPy object instead. +LazyArray objects also support user metadata via :attr:`LazyArray.vlmeta`. For +in-memory objects, this metadata lives on the Python object itself. For +persisted LazyArrays reopened from disk, metadata is synchronized with the +underlying carrier and survives reopening. + See the `LazyExpr`_ and `LazyUDF`_ sections for more information. .. currentmodule:: blosc2 @@ -33,6 +38,10 @@ See the `LazyExpr`_ and `LazyUDF`_ sections for more information. --------------- .. automethod:: __getitem__ + Attributes + ---------- + .. autoattribute:: vlmeta + .. _LazyExpr: LazyExpr @@ -51,7 +60,7 @@ LazyUDF For getting a LazyUDF object (which is LazyArray-compliant) from a user-defined Python function, you can use the lazyudf constructor below. See `a tutorial on how this works <../getting_started/tutorials/03.lazyarray-udf.html>`_. -This object follows the `LazyArray`_ API for computation, although storage is not supported yet. +This object follows the `LazyArray`_ API for computation and storage. .. autofunction:: lazyudf diff --git a/src/blosc2/b2objects.py b/src/blosc2/b2objects.py index a6116002..50c583e6 100644 --- a/src/blosc2/b2objects.py +++ b/src/blosc2/b2objects.py @@ -23,6 +23,7 @@ _B2OBJECT_META_KEY = "b2o" _B2OBJECT_VERSION = 1 _B2OBJECT_DSL_VERSION = 1 +_B2OBJECT_USER_VLMETA_KEY = "_b2o_user_vlmeta" def make_b2object_carrier( @@ -44,6 +45,17 @@ def write_b2object_payload(array, payload: dict[str, Any]) -> None: array.schunk.vlmeta[_B2OBJECT_META_KEY] = payload +def write_b2object_user_vlmeta(array, user_vlmeta: dict[str, Any]) -> None: + array.schunk.vlmeta[_B2OBJECT_USER_VLMETA_KEY] = user_vlmeta + + +def read_b2object_user_vlmeta(obj) -> dict[str, Any]: + schunk = getattr(obj, "schunk", obj) + if _B2OBJECT_USER_VLMETA_KEY not in schunk.vlmeta: + return {} + return schunk.vlmeta[_B2OBJECT_USER_VLMETA_KEY] + + def encode_operand_reference(obj): return blosc2.Ref.from_object(obj).to_dict() @@ -232,4 +244,5 @@ def open_b2object(obj): if isinstance(opened, blosc2.LazyExpr | blosc2.LazyUDF): opened.array = obj opened.schunk = schunk + opened._set_user_vlmeta(read_b2object_user_vlmeta(obj), sync=False) return opened diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index ae6c4413..63d656bc 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -24,6 +24,7 @@ import textwrap import threading from abc import ABC, abstractmethod, abstractproperty +from collections.abc import MutableMapping from dataclasses import asdict from enum import Enum from pathlib import Path @@ -42,7 +43,13 @@ import blosc2 -from .b2objects import encode_b2object_payload, make_b2object_carrier, write_b2object_payload +from .b2objects import ( + encode_b2object_payload, + make_b2object_carrier, + read_b2object_user_vlmeta, + write_b2object_payload, + write_b2object_user_vlmeta, +) from .dsl_kernel import DSLKernel, DSLSyntaxError, DSLValidator, specialize_miniexpr_inputs if blosc2._HAS_NUMBA: @@ -325,7 +332,64 @@ class LazyArrayEnum(Enum): UDF = 1 +class LazyArrayVLMeta(MutableMapping): + """User metadata attached to a LazyArray.""" + + def __init__(self, lazyarr: LazyArray): + self.lazyarr = lazyarr + + def __getitem__(self, key): + return self.lazyarr._get_user_vlmeta()[key] + + def __setitem__(self, key, value): + data = self.lazyarr._get_user_vlmeta() + data[key] = value + self.lazyarr._sync_user_vlmeta() + + def __delitem__(self, key): + data = self.lazyarr._get_user_vlmeta() + del data[key] + self.lazyarr._sync_user_vlmeta() + + def __iter__(self): + return iter(self.lazyarr._get_user_vlmeta()) + + def __len__(self): + return len(self.lazyarr._get_user_vlmeta()) + + def getall(self): + return self.lazyarr._get_user_vlmeta().copy() + + def __repr__(self): + return repr(self.getall()) + + def __str__(self): + return str(self.getall()) + + class LazyArray(ABC, blosc2.Operand): + def _get_user_vlmeta(self) -> dict[str, Any]: + if not hasattr(self, "_vlmeta_user"): + self._vlmeta_user = {} + return self._vlmeta_user + + def _set_user_vlmeta(self, metadata: dict[str, Any], *, sync: bool = True) -> None: + self._vlmeta_user = dict(metadata) + if sync: + self._sync_user_vlmeta() + + def _sync_user_vlmeta(self) -> None: + array = getattr(self, "array", None) + if array is not None: + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) + + @property + def vlmeta(self) -> LazyArrayVLMeta: + """User variable-length metadata for this LazyArray.""" + if not hasattr(self, "_vlmeta_proxy"): + self._vlmeta_proxy = LazyArrayVLMeta(self) + return self._vlmeta_proxy + @abstractmethod def indices(self, order: str | list[str] | None = None) -> blosc2.LazyArray: """ @@ -494,6 +558,9 @@ def save(self, **kwargs: Any) -> None: section for more info). * This is currently only supported for :ref:`LazyExpr` and :ref:`LazyUDF` (including kernels decorated with :func:`blosc2.dsl_kernel`). + * User metadata can be attached via :attr:`vlmeta`. For in-memory LazyArrays + this stays in memory; for persisted LazyArrays it is serialized and restored + on reopen. Examples -------- @@ -3764,6 +3831,7 @@ def _to_b2object_carrier(self, **kwargs): **kwargs, ) write_b2object_payload(array, payload) + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) return array @classmethod @@ -4125,6 +4193,7 @@ def save(self, urlpath=None, **kwargs): if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None: meta["dsl_source"] = self.func.dsl_source array.schunk.vlmeta["_LazyArray"] = meta + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) def to_cframe(self) -> bytes: return self._to_b2object_carrier().to_cframe() @@ -4160,6 +4229,7 @@ def _to_b2object_carrier(self, **kwargs): **kwargs, ) write_b2object_payload(array, payload) + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) return array @@ -4518,6 +4588,7 @@ def open_lazyarray(array): new_expr.array = array # We want to expose schunk too, so that .info() can be used on the LazyArray new_expr.schunk = array.schunk + new_expr._set_user_vlmeta(read_b2object_user_vlmeta(array), sync=False) return new_expr diff --git a/tests/ndarray/test_lazyexpr.py b/tests/ndarray/test_lazyexpr.py index ca4fc254..cf4f0f63 100644 --- a/tests/ndarray/test_lazyexpr.py +++ b/tests/ndarray/test_lazyexpr.py @@ -1768,6 +1768,29 @@ def test_save_proxy_operands_reopen_default_mode(tmp_path): np.testing.assert_array_equal(restored[:], np.arange(10, dtype=np.int64) * 2) +def test_lazyexpr_vlmeta_in_memory_and_persisted(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(tmp_path / "b.b2nd"), mode="w") + expr = a + b + + expr.vlmeta["name"] = "sum" + expr.vlmeta["config"] = {"scale": 1} + assert expr.vlmeta["name"] == "sum" + assert expr.vlmeta["config"] == {"scale": 1} + + expr_path = tmp_path / "expr_vlmeta.b2nd" + expr.save(str(expr_path)) + restored = blosc2.open(str(expr_path)) + + assert restored.vlmeta["name"] == "sum" + assert restored.vlmeta["config"] == {"scale": 1} + + restored.vlmeta["note"] = "persisted" + reopened = blosc2.open(str(expr_path)) + assert reopened.vlmeta["note"] == "persisted" + np.testing.assert_array_equal(reopened[:], np.arange(5, dtype=np.int64) * 2) + + # Test the chaining of multiple lazy expressions def test_chain_expressions(): N = 1_000 diff --git a/tests/ndarray/test_lazyudf.py b/tests/ndarray/test_lazyudf.py index f2b164f5..3aa76cb6 100644 --- a/tests/ndarray/test_lazyudf.py +++ b/tests/ndarray/test_lazyudf.py @@ -501,6 +501,23 @@ def test_save_ludf(): blosc2.remove_urlpath(urlpath) +def test_lazyudf_vlmeta_roundtrip(tmp_path): + a_path = tmp_path / "a.b2nd" + expr_path = tmp_path / "lazyudf_vlmeta.b2nd" + array = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(a_path), mode="w") + expr = blosc2.lazyudf(udf1p, (array,), np.float64) + + expr.vlmeta["name"] = "increment" + expr.vlmeta["attrs"] = {"version": 1} + expr.save(urlpath=str(expr_path)) + + restored = blosc2.open(str(expr_path)) + + assert isinstance(restored, blosc2.LazyUDF) + assert restored.vlmeta["name"] == "increment" + assert restored.vlmeta["attrs"] == {"version": 1} + + # Test get_chunk method def test_get_chunk(): a = blosc2.linspace(0, 100, 100, shape=(10, 10), chunks=(3, 4), blocks=(2, 3)) From 2f332b130570ff01b4db207193424f6d36a1573c Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 10:08:35 +0200 Subject: [PATCH 12/13] Preserve lazy b2object carriers when storing them in DictStore --- src/blosc2/dict_store.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/blosc2/dict_store.py b/src/blosc2/dict_store.py index 0ffcc8d8..21b5e821 100644 --- a/src/blosc2/dict_store.py +++ b/src/blosc2/dict_store.py @@ -397,7 +397,20 @@ def __setitem__( # Save the value to the destination path if not external_file: - if hasattr(value, "save"): + if isinstance(value, blosc2.NDArray) and "b2o" in value.schunk.meta: + carrier = blosc2.empty( + value.shape, + value.dtype, + chunks=value.chunks, + blocks=value.blocks, + cparams=value.cparams, + urlpath=dest_path, + mode="w", + meta={"b2o": value.schunk.meta["b2o"]}, + ) + for meta_key, meta_value in value.schunk.vlmeta[:].items(): + carrier.schunk.vlmeta[meta_key] = meta_value + elif hasattr(value, "save"): value.save(urlpath=dest_path) else: # SChunk, VLArray and BatchStore can all be persisted via their cframe. From 6de6630f602f28c9f7719533401212c46790b768 Mon Sep 17 00:00:00 2001 From: Francesc Alted Date: Mon, 30 Mar 2026 10:12:13 +0200 Subject: [PATCH 13/13] Normalize b2object urlpath expectations in tests --- tests/test_b2objects.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_b2objects.py b/tests/test_b2objects.py index a8a142a5..6317d34e 100644 --- a/tests/test_b2objects.py +++ b/tests/test_b2objects.py @@ -102,8 +102,8 @@ def test_lazyexpr_from_cframe_roundtrip(tmp_path): "version": 1, "expression": "a + b", "operands": { - "a": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "a.b2nd")}, - "b": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "b.b2nd")}, + "a": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "a.b2nd").as_posix()}, + "b": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "b.b2nd").as_posix()}, }, } @@ -161,8 +161,8 @@ def test_lazyudf_from_cframe_roundtrip(tmp_path): assert payload["dtype"] == np.dtype(np.float64).str assert payload["shape"] == [5] assert payload["operands"] == { - "o0": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "a.b2nd")}, - "o1": {"kind": "urlpath", "version": 1, "urlpath": str(tmp_path / "b.b2nd")}, + "o0": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "a.b2nd").as_posix()}, + "o1": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "b.b2nd").as_posix()}, } restored = blosc2.from_cframe(expr.to_cframe())