diff --git a/doc/reference/lazyarray.rst b/doc/reference/lazyarray.rst index 9be693c7..1080a62b 100644 --- a/doc/reference/lazyarray.rst +++ b/doc/reference/lazyarray.rst @@ -14,6 +14,11 @@ You can get an object following the LazyArray API with any of the following ways The LazyArray object is a thin wrapper around the expression or user-defined function that allows for lazy computation. This means that the expression is not computed until the ``compute`` or ``__getitem__`` methods are called. The ``compute`` method will return a new NDArray object with the result of the expression evaluation. The ``__getitem__`` method will return an NumPy object instead. +LazyArray objects also support user metadata via :attr:`LazyArray.vlmeta`. For +in-memory objects, this metadata lives on the Python object itself. For +persisted LazyArrays reopened from disk, metadata is synchronized with the +underlying carrier and survives reopening. + See the `LazyExpr`_ and `LazyUDF`_ sections for more information. .. currentmodule:: blosc2 @@ -33,6 +38,10 @@ See the `LazyExpr`_ and `LazyUDF`_ sections for more information. --------------- .. automethod:: __getitem__ + Attributes + ---------- + .. autoattribute:: vlmeta + .. _LazyExpr: LazyExpr @@ -51,7 +60,7 @@ LazyUDF For getting a LazyUDF object (which is LazyArray-compliant) from a user-defined Python function, you can use the lazyudf constructor below. See `a tutorial on how this works <../getting_started/tutorials/03.lazyarray-udf.html>`_. -This object follows the `LazyArray`_ API for computation, although storage is not supported yet. +This object follows the `LazyArray`_ API for computation and storage. .. autofunction:: lazyudf diff --git a/examples/embeded-expr-udf-b2z.py b/examples/embeded-expr-udf-b2z.py new file mode 100644 index 00000000..8111dd5a --- /dev/null +++ b/examples/embeded-expr-udf-b2z.py @@ -0,0 +1,76 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +import numpy as np + +import blosc2 +from blosc2 import where + + +def show(label, value): + print(f"{label}: {value}") + + +@blosc2.dsl_kernel +def masked_energy(a, b, mask): + return where(mask > 0, a * a + 2 * b, 0.0) + + +bundle_path = "example_b2o_bundle.b2z" +blosc2.remove_urlpath(bundle_path) + +# Build a portable bundle with ordinary arrays plus persisted lazy recipes. +with blosc2.DictStore(bundle_path, mode="w", threshold=1) as store: + store["/data/a"] = np.linspace(0.0, 1.0, 10, dtype=np.float32) + store["/data/b"] = np.linspace(1.0, 2.0, 10, dtype=np.float32) + store["/data/mask"] = np.asarray([0, 1, 1, 0, 1, 0, 1, 1, 0, 1], dtype=np.int8) + + # Reopen the array members through the store so operand refs point back to + # the .b2z container via dictstore_key payloads. + a = store["/data/a"] + b = store["/data/b"] + mask = store["/data/mask"] + + expr = blosc2.lazyexpr("(a - b) / (a + b + 1e-6)", operands={"a": a, "b": b}) + udf = blosc2.lazyudf(masked_energy, (a, b, mask), dtype=np.float32, shape=a.shape) + + # DictStore currently stores array-like external leaves, so persist the + # logical lazy objects through their b2o NDArray carriers. + store["/recipes/expr"] = blosc2.ndarray_from_cframe(expr.to_cframe()) + store["/recipes/udf"] = blosc2.ndarray_from_cframe(udf.to_cframe()) + +show("Bundle created", bundle_path) + +# Reopen the bundle read-only. The persisted LazyExpr and LazyUDF can be +# evaluated directly without re-saving, rebuilding, or re-deploying the .b2z. +with blosc2.open(bundle_path, mode="r") as store: + show("Read-only keys", sorted(store.keys())) + + expr = store["/recipes/expr"] + udf = store["/recipes/udf"] + + expr_result = expr.compute() + udf_result = udf.compute() + + show("Reopened expr type", type(expr).__name__) + show("Reopened udf type", type(udf).__name__) + show("Expr operand refs", expr.array.schunk.vlmeta["b2o"]["operands"]) + show("UDF operand refs", udf.array.schunk.vlmeta["b2o"]["operands"]) + show("Expr values", np.round(expr_result[:], 4)) + show("UDF values", udf_result[:]) + + expected_expr = (store["/data/a"][:] - store["/data/b"][:]) / ( + store["/data/a"][:] + store["/data/b"][:] + 1e-6 + ) + expected_udf = np.where( + store["/data/mask"][:] > 0, + store["/data/a"][:] ** 2 + 2 * store["/data/b"][:], + 0.0, + ).astype(np.float32) + np.testing.assert_allclose(expr_result[:], expected_expr, rtol=1e-6, atol=1e-6) + np.testing.assert_allclose(udf_result[:], expected_udf, rtol=1e-6, atol=1e-6) + show("Read-only evaluation", "ok") diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index 3eb96ce4..8ba00c5e 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -253,7 +253,7 @@ class FPAccuracy(Enum): The C-Blosc2 version's string.""" if IS_WASM: - from ._wasm_jit import init_wasm_jit_helpers + from .wasm_jit import init_wasm_jit_helpers _WASM_MINIEXPR_ENABLED = init_wasm_jit_helpers() @@ -538,6 +538,7 @@ def _raise(exc): from .batch_store import Batch, BatchStore from .vlarray import VLArray, vlarray_from_cframe from .ref import Ref +from .b2objects import open_b2object from .c2array import c2context, C2Array, URLPath @@ -548,7 +549,7 @@ def _raise(exc): lazyexpr, LazyArray, LazyUDF, - _open_lazyarray, + open_lazyarray, get_expr_operands, validate_expr, evaluate, diff --git a/src/blosc2/_msgpack_utils.py b/src/blosc2/_msgpack_utils.py deleted file mode 100644 index 9cdc8df4..00000000 --- a/src/blosc2/_msgpack_utils.py +++ /dev/null @@ -1,222 +0,0 @@ -####################################################################### -# Copyright (c) 2019-present, Blosc Development Team -# All rights reserved. -# -# SPDX-License-Identifier: BSD-3-Clause -####################################################################### - -from __future__ import annotations - -import builtins -import inspect -import linecache -import textwrap -from dataclasses import asdict - -import numpy as np -from msgpack import ExtType, packb, unpackb - -from blosc2 import blosc2_ext -from blosc2.dsl_kernel import DSLKernel -from blosc2.ref import Ref - -# Msgpack extension type codes are application-defined. Reserve code 42 in -# python-blosc2 for values serialized as Blosc2 CFrames via ``to_cframe()`` and -# reconstructed with ``blosc2.from_cframe()``. Keep this stable for backward -# compatibility with persisted msgpack payloads produced by this package. -_BLOSC2_EXT_CODE = 42 -# Reserve code 43 for structured Blosc2 reference objects that are not naturally -# serialized as CFrames. The payload is a msgpack-encoded mapping with a -# stable ``kind`` and ``version`` envelope. -_BLOSC2_STRUCTURED_EXT_CODE = 43 -_BLOSC2_STRUCTURED_VERSION = 1 -_BLOSC2_DSL_VERSION = 1 - - -def _encode_operand_reference(obj): - return Ref.from_object(obj).to_dict() - - -def _encode_structured_reference(obj): - import blosc2 - - if isinstance(obj, blosc2.Ref): - payload = {"kind": "ref", "version": _BLOSC2_STRUCTURED_VERSION, "ref": obj.to_dict()} - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) - if isinstance(obj, blosc2.C2Array): - payload = _encode_operand_reference(obj) - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) - if isinstance(obj, blosc2.LazyExpr): - expression = obj.expression_tosave if hasattr(obj, "expression_tosave") else obj.expression - operands = obj.operands_tosave if hasattr(obj, "operands_tosave") else obj.operands - payload = { - "kind": "lazyexpr", - "version": _BLOSC2_STRUCTURED_VERSION, - "expression": expression, - "operands": {key: _encode_operand_reference(value) for key, value in operands.items()}, - } - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) - if isinstance(obj, blosc2.LazyUDF): - if not isinstance(obj.func, DSLKernel): - raise TypeError("Structured Blosc2 msgpack payload only supports LazyUDF backed by DSLKernel") - udf_func = obj.func.func - udf_name = getattr(udf_func, "__name__", obj.func.__name__) - try: - udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip() - except Exception: - udf_source = obj.func.dsl_source - if udf_source is None: - raise ValueError("Structured LazyUDF msgpack payload requires recoverable DSL kernel source") - kwargs = {} - for key, value in obj.kwargs.items(): - if key in {"dtype", "shape"}: - continue - if isinstance(value, blosc2.CParams | blosc2.DParams): - kwargs[key] = asdict(value) - else: - kwargs[key] = value - # Keep both source forms: - # - udf_source recreates the executable Python function object - # - dsl_source preserves the DSLKernel's normalized DSL metadata so the - # reconstructed function can keep its DSL identity and fast-path hints - payload = { - "kind": "lazyudf", - "version": _BLOSC2_STRUCTURED_VERSION, - "function_kind": "dsl", - "dsl_version": _BLOSC2_DSL_VERSION, - "name": udf_name, - "udf_source": udf_source, - "dsl_source": obj.func.dsl_source, - "dtype": np.dtype(obj.dtype).str, - "shape": list(obj.shape), - "operands": {f"o{i}": _encode_operand_reference(value) for i, value in enumerate(obj.inputs)}, - "kwargs": kwargs, - } - return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) - return None - - -def _decode_operand_reference(payload): - return Ref.from_dict(payload).open() - - -def _decode_structured_reference(data): - payload = unpackb(data) - if not isinstance(payload, dict): - raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping") - - version = payload.get("version") - if version != _BLOSC2_STRUCTURED_VERSION: - raise ValueError(f"Unsupported structured Blosc2 msgpack payload version: {version!r}") - - kind = payload.get("kind") - if kind == "ref": - ref_payload = payload.get("ref") - return Ref.from_dict(ref_payload) - if kind == "c2array": - return _decode_operand_reference(payload) - if kind == "lazyexpr": - return _decode_structured_lazyexpr(payload) - if kind == "lazyudf": - return _decode_structured_lazyudf(payload) - raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}") - - -def _decode_structured_lazyexpr(payload): - import blosc2 - - expression = payload.get("expression") - if not isinstance(expression, str): - raise TypeError("Structured LazyExpr msgpack payload requires a string 'expression'") - operands_payload = payload.get("operands") - if not isinstance(operands_payload, dict): - raise TypeError("Structured LazyExpr msgpack payload requires a mapping 'operands'") - operands = {key: _decode_operand_reference(value) for key, value in operands_payload.items()} - return blosc2.lazyexpr(expression, operands=operands) - - -def _decode_structured_lazyudf(payload): - import blosc2 - - function_kind = payload.get("function_kind") - if function_kind != "dsl": - raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}") - dsl_version = payload.get("dsl_version") - if dsl_version != _BLOSC2_DSL_VERSION: - raise ValueError(f"Unsupported structured LazyUDF DSL version: {dsl_version!r}") - udf_source = payload.get("udf_source") - if not isinstance(udf_source, str): - raise TypeError("Structured LazyUDF msgpack payload requires a string 'udf_source'") - name = payload.get("name") - if not isinstance(name, str): - raise TypeError("Structured LazyUDF msgpack payload requires a string 'name'") - dtype = payload.get("dtype") - if not isinstance(dtype, str): - raise TypeError("Structured LazyUDF msgpack payload requires a string 'dtype'") - shape_payload = payload.get("shape") - if not isinstance(shape_payload, list): - raise TypeError("Structured LazyUDF msgpack payload requires a list 'shape'") - operands_payload = payload.get("operands") - if not isinstance(operands_payload, dict): - raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'operands'") - kwargs = payload.get("kwargs", {}) - if not isinstance(kwargs, dict): - raise TypeError("Structured LazyUDF msgpack payload requires a mapping 'kwargs'") - - local_ns = {} - filename = f"<{name}>" - safe_globals = { - "__builtins__": {k: v for k, v in builtins.__dict__.items() if k != "__import__"}, - "np": np, - "blosc2": blosc2, - } - linecache.cache[filename] = (len(udf_source), None, udf_source.splitlines(True), filename) - exec(compile(udf_source, filename, "exec"), safe_globals, local_ns) - func = local_ns[name] - if not isinstance(func, DSLKernel): - func = DSLKernel(func) - dsl_source = payload.get("dsl_source") - if dsl_source is not None and func.dsl_source is None: - func.dsl_source = dsl_source - - operands = tuple( - _decode_operand_reference(operands_payload[f"o{n}"]) for n in range(len(operands_payload)) - ) - return blosc2.lazyudf(func, operands, dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs) - - -def _encode_msgpack_ext(obj): - import blosc2 - - if isinstance( - obj, blosc2.NDArray | blosc2.SChunk | blosc2.VLArray | blosc2.BatchStore | blosc2.EmbedStore - ): - return ExtType(_BLOSC2_EXT_CODE, obj.to_cframe()) - structured = _encode_structured_reference(obj) - if structured is not None: - return structured - return blosc2_ext.encode_tuple(obj) - - -def msgpack_packb(value): - return packb(value, default=_encode_msgpack_ext, strict_types=True, use_bin_type=True) - - -def decode_tuple_list_hook(obj): - if obj and isinstance(obj[0], str) and obj[0] == "__tuple__": - return tuple(obj[1:]) - return obj - - -def _decode_msgpack_ext(code, data): - import blosc2 - - if code == _BLOSC2_EXT_CODE: - return blosc2.from_cframe(data, copy=True) - if code == _BLOSC2_STRUCTURED_EXT_CODE: - return _decode_structured_reference(data) - return ExtType(code, data) - - -def msgpack_unpackb(payload): - return unpackb(payload, list_hook=decode_tuple_list_hook, ext_hook=_decode_msgpack_ext) diff --git a/src/blosc2/b2objects.py b/src/blosc2/b2objects.py new file mode 100644 index 00000000..50c583e6 --- /dev/null +++ b/src/blosc2/b2objects.py @@ -0,0 +1,248 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +import builtins +import inspect +import linecache +import pathlib +import textwrap +from dataclasses import asdict +from typing import Any + +import numpy as np + +import blosc2 +from blosc2.dsl_kernel import DSLKernel + +_B2OBJECT_META_KEY = "b2o" +_B2OBJECT_VERSION = 1 +_B2OBJECT_DSL_VERSION = 1 +_B2OBJECT_USER_VLMETA_KEY = "_b2o_user_vlmeta" + + +def make_b2object_carrier( + kind: str, + shape, + dtype, + *, + chunks=None, + blocks=None, + **kwargs, +): + meta = dict(kwargs.pop("meta", {})) + meta[_B2OBJECT_META_KEY] = {"kind": kind, "version": _B2OBJECT_VERSION} + kwargs["meta"] = meta + return blosc2.empty(shape=shape, dtype=dtype, chunks=chunks, blocks=blocks, **kwargs) + + +def write_b2object_payload(array, payload: dict[str, Any]) -> None: + array.schunk.vlmeta[_B2OBJECT_META_KEY] = payload + + +def write_b2object_user_vlmeta(array, user_vlmeta: dict[str, Any]) -> None: + array.schunk.vlmeta[_B2OBJECT_USER_VLMETA_KEY] = user_vlmeta + + +def read_b2object_user_vlmeta(obj) -> dict[str, Any]: + schunk = getattr(obj, "schunk", obj) + if _B2OBJECT_USER_VLMETA_KEY not in schunk.vlmeta: + return {} + return schunk.vlmeta[_B2OBJECT_USER_VLMETA_KEY] + + +def encode_operand_reference(obj): + return blosc2.Ref.from_object(obj).to_dict() + + +def decode_operand_reference(payload, *, base_path=None): + if ( + payload.get("kind") in {"urlpath", "dictstore_key"} + and base_path is not None + and not pathlib.Path(payload["urlpath"]).is_absolute() + ): + payload = dict(payload) + payload["urlpath"] = (base_path / payload["urlpath"]).as_posix() + ref = blosc2.Ref.from_dict(payload) + return ref.open() + + +def encode_b2object_payload(obj) -> dict[str, Any] | None: + if isinstance(obj, blosc2.C2Array): + return blosc2.Ref.c2array_ref(obj.path, obj.urlbase).to_dict() + if isinstance(obj, blosc2.LazyExpr): + expression = obj.expression_tosave if hasattr(obj, "expression_tosave") else obj.expression + operands = obj.operands_tosave if hasattr(obj, "operands_tosave") else obj.operands + return { + "kind": "lazyexpr", + "version": _B2OBJECT_VERSION, + "expression": expression, + "operands": {key: encode_operand_reference(value) for key, value in operands.items()}, + } + if isinstance(obj, blosc2.LazyUDF): + if not isinstance(obj.func, DSLKernel): + raise TypeError("Structured Blosc2 msgpack payload only supports LazyUDF backed by DSLKernel") + udf_func = obj.func.func + udf_name = getattr(udf_func, "__name__", obj.func.__name__) + try: + udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip() + except Exception: + udf_source = obj.func.dsl_source + if udf_source is None: + raise ValueError("Structured LazyUDF msgpack payload requires recoverable DSL kernel source") + kwargs = {} + for key, value in obj.kwargs.items(): + if key in {"dtype", "shape"}: + continue + if isinstance(value, blosc2.CParams | blosc2.DParams): + kwargs[key] = asdict(value) + else: + kwargs[key] = value + return { + "kind": "lazyudf", + "version": _B2OBJECT_VERSION, + "function_kind": "dsl", + "dsl_version": _B2OBJECT_DSL_VERSION, + "name": udf_name, + "udf_source": udf_source, + "dtype": np.dtype(obj.dtype).str, + "shape": list(obj.shape), + "operands": {f"o{i}": encode_operand_reference(value) for i, value in enumerate(obj.inputs)}, + "kwargs": kwargs, + } + return None + + +def decode_b2object_payload(payload: dict[str, Any], *, carrier_path=None): + kind = payload.get("kind") + version = payload.get("version") + if version != _B2OBJECT_VERSION: + raise ValueError(f"Unsupported persisted Blosc2 object version: {version!r}") + if kind == "c2array": + ref = blosc2.Ref.from_dict(payload) + return ref.open() + if kind == "lazyexpr": + return decode_structured_lazyexpr(payload, carrier_path=carrier_path) + if kind == "lazyudf": + return decode_structured_lazyudf(payload, carrier_path=carrier_path) + raise ValueError(f"Unsupported persisted Blosc2 object kind: {kind!r}") + + +def decode_structured_lazyexpr(payload, *, carrier_path=None): + expression = payload.get("expression") + if not isinstance(expression, str): + raise TypeError("Structured LazyExpr payload requires a string 'expression'") + operands_payload = payload.get("operands") + if not isinstance(operands_payload, dict): + raise TypeError("Structured LazyExpr payload requires a mapping 'operands'") + operands, missing_ops = decode_operand_mapping(operands_payload, base_path=carrier_path) + if missing_ops: + exc = blosc2.exceptions.MissingOperands(expression, missing_ops) + exc.expr = expression + exc.missing_ops = missing_ops + raise exc + return blosc2.lazyexpr(expression, operands=operands) + + +def decode_operand_mapping(operands_payload, *, base_path=None): + operands = {} + missing_ops = {} + for key, value in operands_payload.items(): + try: + operands[key] = decode_operand_reference(value, base_path=base_path) + except FileNotFoundError: + ref = blosc2.Ref.from_dict(value) + if ref.kind in {"urlpath", "dictstore_key"}: + missing_ops[key] = pathlib.Path(ref.urlpath) + else: + raise + return operands, missing_ops + + +def decode_structured_lazyudf(payload, *, carrier_path=None): + function_kind = payload.get("function_kind") + if function_kind != "dsl": + raise ValueError(f"Unsupported structured LazyUDF function kind: {function_kind!r}") + dsl_version = payload.get("dsl_version") + if dsl_version != _B2OBJECT_DSL_VERSION: + raise ValueError(f"Unsupported structured LazyUDF DSL version: {dsl_version!r}") + udf_source = payload.get("udf_source") + if not isinstance(udf_source, str): + raise TypeError("Structured LazyUDF payload requires a string 'udf_source'") + name = payload.get("name") + if not isinstance(name, str): + raise TypeError("Structured LazyUDF payload requires a string 'name'") + dtype = payload.get("dtype") + if not isinstance(dtype, str): + raise TypeError("Structured LazyUDF payload requires a string 'dtype'") + shape_payload = payload.get("shape") + if not isinstance(shape_payload, list): + raise TypeError("Structured LazyUDF payload requires a list 'shape'") + operands_payload = payload.get("operands") + if not isinstance(operands_payload, dict): + raise TypeError("Structured LazyUDF payload requires a mapping 'operands'") + kwargs = payload.get("kwargs", {}) + if not isinstance(kwargs, dict): + raise TypeError("Structured LazyUDF payload requires a mapping 'kwargs'") + + local_ns = {} + filename = f"<{name}>" + safe_globals = { + "__builtins__": {k: v for k, v in builtins.__dict__.items() if k != "__import__"}, + "np": np, + "blosc2": blosc2, + } + linecache.cache[filename] = (len(udf_source), None, udf_source.splitlines(True), filename) + exec(compile(udf_source, filename, "exec"), safe_globals, local_ns) + func = local_ns[name] + if not isinstance(func, DSLKernel): + func = DSLKernel(func) + ordered_operands_payload = {f"o{n}": operands_payload[f"o{n}"] for n in range(len(operands_payload))} + operands, missing_ops = decode_operand_mapping(ordered_operands_payload, base_path=carrier_path) + if missing_ops: + exc = blosc2.exceptions.MissingOperands(name, missing_ops) + exc.expr = name + exc.missing_ops = missing_ops + raise exc + return blosc2.lazyudf( + func, tuple(operands.values()), dtype=np.dtype(dtype), shape=tuple(shape_payload), **kwargs + ) + + +def read_b2object_marker(obj) -> dict[str, Any] | None: + schunk = getattr(obj, "schunk", obj) + if _B2OBJECT_META_KEY not in schunk.meta: + return None + return schunk.meta[_B2OBJECT_META_KEY] + + +def read_b2object_payload(obj) -> dict[str, Any]: + schunk = getattr(obj, "schunk", obj) + return schunk.vlmeta[_B2OBJECT_META_KEY] + + +def open_b2object(obj): + marker = read_b2object_marker(obj) + if marker is None: + return None + + payload = read_b2object_payload(obj) + if marker.get("version") != _B2OBJECT_VERSION: + raise ValueError(f"Unsupported persisted Blosc2 object version: {marker.get('version')!r}") + if marker.get("kind") != payload.get("kind"): + raise ValueError("Persisted Blosc2 object marker/payload kind mismatch") + carrier_path = None + schunk = getattr(obj, "schunk", obj) + if getattr(schunk, "urlpath", None) is not None: + carrier_path = pathlib.Path(schunk.urlpath).parent + opened = decode_b2object_payload(payload, carrier_path=carrier_path) + if isinstance(opened, blosc2.LazyExpr | blosc2.LazyUDF): + opened.array = obj + opened.schunk = schunk + opened._set_user_vlmeta(read_b2object_user_vlmeta(obj), sync=False) + return opened diff --git a/src/blosc2/batch_store.py b/src/blosc2/batch_store.py index 23d7f0e3..4b93d670 100644 --- a/src/blosc2/batch_store.py +++ b/src/blosc2/batch_store.py @@ -18,8 +18,8 @@ import numpy as np import blosc2 -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb from blosc2.info import InfoReporter, format_nbytes_info +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb _BATCHSTORE_META = {"version": 1, "serializer": "msgpack", "items_per_block": None, "arrow_schema": None} _SUPPORTED_SERIALIZERS = {"msgpack", "arrow"} diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index 8a442eb7..2ba002e5 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -726,7 +726,7 @@ def destroy(): blosc2_destroy() -def _register_wasm_jit_helpers(uintptr_t instantiate_ptr, uintptr_t free_ptr): +def register_wasm_jit_helpers(uintptr_t instantiate_ptr, uintptr_t free_ptr): cdef me_wasm_jit_instantiate_helper instantiate_helper = ( instantiate_ptr ) diff --git a/src/blosc2/c2array.py b/src/blosc2/c2array.py index 11f7f6cb..af0229c4 100644 --- a/src/blosc2/c2array.py +++ b/src/blosc2/c2array.py @@ -18,6 +18,7 @@ import requests import blosc2 +from blosc2.b2objects import encode_b2object_payload, make_b2object_carrier, write_b2object_payload from blosc2.info import InfoReporter, format_nbytes_info _subscriber_data = { @@ -238,6 +239,37 @@ def __init__(self, path: str, /, urlbase: str | None = None, auth_token: str | N cparams.pop("filters, meta", None) self._cparams = blosc2.CParams(**cparams) + def _to_b2object_payload(self) -> dict: + payload = encode_b2object_payload(self) + if payload is None: + raise TypeError("Unsupported persisted Blosc2 object") + return payload + + def _to_b2object_carrier(self, **kwargs): + array = make_b2object_carrier( + "c2array", + self.shape, + self.dtype, + chunks=self.chunks, + blocks=self.blocks, + cparams=self.cparams, + **kwargs, + ) + write_b2object_payload(array, self._to_b2object_payload()) + return array + + def to_cframe(self) -> bytes: + """Serialize the remote array reference as a CFrame-backed Blosc2 object.""" + return self._to_b2object_carrier().to_cframe() + + def save(self, urlpath: str, contiguous: bool = True, **kwargs) -> None: + """Persist the remote array reference using a CFrame-backed carrier.""" + blosc2.blosc2_ext.check_access_mode(urlpath, "w") + kwargs["urlpath"] = urlpath + kwargs["contiguous"] = contiguous + kwargs["mode"] = "w" + self._to_b2object_carrier(**kwargs) + def __getitem__(self, slice_: int | slice | Sequence[slice]) -> np.ndarray: """ Get a slice of the array (returning NumPy array). diff --git a/src/blosc2/core.py b/src/blosc2/core.py index d574a21e..e3d9d4ed 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1918,7 +1918,9 @@ def ndarray_from_cframe(cframe: bytes | str, copy: bool = False) -> blosc2.NDArr def from_cframe( cframe: bytes | str, copy: bool = True -) -> blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchStore | blosc2.VLArray: +) -> ( + blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchStore | blosc2.VLArray | blosc2.C2Array +): """Create a :ref:`EmbedStore `, :ref:`NDArray `, :ref:`SChunk `, :ref:`BatchStore ` or :ref:`VLArray ` instance from a contiguous frame buffer. @@ -1956,6 +1958,8 @@ def from_cframe( return blosc2.BatchStore(_from_schunk=schunk_from_cframe(cframe, copy=copy)) if "vlarray" in schunk.meta: return blosc2.vlarray_from_cframe(cframe, copy=copy) + if "b2o" in schunk.meta: + return blosc2.open_b2object(ndarray_from_cframe(cframe, copy=copy)) if "b2nd" in schunk.meta: return ndarray_from_cframe(cframe, copy=copy) return schunk_from_cframe(cframe, copy=copy) diff --git a/src/blosc2/dict_store.py b/src/blosc2/dict_store.py index 6c112504..21b5e821 100644 --- a/src/blosc2/dict_store.py +++ b/src/blosc2/dict_store.py @@ -19,7 +19,7 @@ import blosc2 from blosc2.c2array import C2Array from blosc2.embed_store import EmbedStore -from blosc2.schunk import SChunk, _process_opened_object +from blosc2.schunk import SChunk, process_opened_object if TYPE_CHECKING: from collections.abc import Iterator, Set @@ -227,30 +227,39 @@ def _opened_external_kind( rel_path: str, ) -> str | None: """Return the supported external leaf kind for an already opened object.""" - processed = _process_opened_object(opened) - if isinstance(processed, blosc2.BatchStore): - kind = "batchstore" - elif isinstance(processed, blosc2.VLArray): - kind = "vlarray" - elif isinstance(processed, blosc2.NDArray): + meta = getattr(opened, "schunk", opened).meta + if "b2o" in meta and isinstance(opened, blosc2.NDArray): + # Keep b2o carriers as NDArray external leaves during discovery. + # Rehydrating them here can recurse when a lazy recipe points back + # into the same DictStore via dictstore_key refs. kind = "ndarray" - elif isinstance(processed, SChunk): - kind = "schunk" + processed_name = type(opened).__name__ else: - warnings.warn( - f"Ignoring unsupported Blosc2 object at '{rel_path}' during DictStore discovery: " - f"{type(processed).__name__}", - UserWarning, - stacklevel=2, - ) - return None + processed = process_opened_object(opened) + processed_name = type(processed).__name__ + if isinstance(processed, blosc2.BatchStore): + kind = "batchstore" + elif isinstance(processed, blosc2.VLArray): + kind = "vlarray" + elif isinstance(processed, blosc2.NDArray): + kind = "ndarray" + elif isinstance(processed, SChunk): + kind = "schunk" + else: + warnings.warn( + f"Ignoring unsupported Blosc2 object at '{rel_path}' during DictStore discovery: " + f"{processed_name}", + UserWarning, + stacklevel=2, + ) + return None expected_ext = cls._expected_ext_from_kind(kind) found_ext = os.path.splitext(rel_path)[1] if found_ext != expected_ext: warnings.warn( f"External leaf '{rel_path}' uses extension '{found_ext}' but metadata resolves to " - f"{type(processed).__name__}; expected '{expected_ext}'.", + f"{processed_name}; expected '{expected_ext}'.", UserWarning, stacklevel=2, ) @@ -388,7 +397,20 @@ def __setitem__( # Save the value to the destination path if not external_file: - if hasattr(value, "save"): + if isinstance(value, blosc2.NDArray) and "b2o" in value.schunk.meta: + carrier = blosc2.empty( + value.shape, + value.dtype, + chunks=value.chunks, + blocks=value.blocks, + cparams=value.cparams, + urlpath=dest_path, + mode="w", + meta={"b2o": value.schunk.meta["b2o"]}, + ) + for meta_key, meta_value in value.schunk.vlmeta[:].items(): + carrier.schunk.vlmeta[meta_key] = meta_value + elif hasattr(value, "save"): value.save(urlpath=dest_path) else: # SChunk, VLArray and BatchStore can all be persisted via their cframe. @@ -425,7 +447,7 @@ def __getitem__( mmap_mode=self.mmap_mode, dparams=self.dparams, ) - return self._annotate_external_value(key, _process_opened_object(opened)) + return self._annotate_external_value(key, process_opened_object(opened)) else: urlpath = os.path.join(self.working_dir, filepath) if os.path.exists(urlpath): @@ -501,7 +523,7 @@ def values(self) -> Iterator[blosc2.NDArray | SChunk | C2Array]: offset = self.offsets[filepath]["offset"] yield self._annotate_external_value( key, - _process_opened_object( + process_opened_object( blosc2.blosc2_ext.open( self.b2z_path, mode="r", @@ -541,7 +563,7 @@ def items(self) -> Iterator[tuple[str, blosc2.NDArray | SChunk | C2Array]]: key, self._annotate_external_value( key, - _process_opened_object( + process_opened_object( blosc2.blosc2_ext.open( self.b2z_path, mode="r", diff --git a/src/blosc2/dsl_kernel.py b/src/blosc2/dsl_kernel.py index 59c64186..429d833c 100644 --- a/src/blosc2/dsl_kernel.py +++ b/src/blosc2/dsl_kernel.py @@ -257,7 +257,7 @@ def specialize_dsl_miniexpr_inputs(expr_string: str, operands: dict): return specialize_miniexpr_inputs(expr_string, operands) -class _DSLValidator: +class DSLValidator: _binop_map: ClassVar[dict[type[ast.operator], str]] = { ast.Add: "+", ast.Sub: "-", @@ -526,7 +526,7 @@ def _extract_dsl(self, func, validate: bool = True): if dsl_func is None: raise ValueError("No function definition found in sliced DSL source") if validate: - _DSLValidator(dsl_source).validate(dsl_func) + DSLValidator(dsl_source).validate(dsl_func) input_names = self._input_names_from_signature(dsl_func) if _PRINT_DSL_KERNEL: func_name = getattr(func, "__name__", "") @@ -613,7 +613,7 @@ def validate_dsl(func): } -class _DSLBuilder: +class DSLBuilder: _binop_map: ClassVar[dict[type[ast.operator], str]] = { ast.Add: "+", ast.Sub: "-", @@ -852,9 +852,9 @@ def _cmpop(self, op: ast.cmpop) -> str: raise ValueError("Unsupported comparison in DSL") -class _DSLReducer: - _binop_map: ClassVar[dict[type[ast.operator], str]] = _DSLBuilder._binop_map - _cmp_map: ClassVar[dict[type[ast.cmpop], str]] = _DSLBuilder._cmp_map +class DSLReducer: + _binop_map: ClassVar[dict[type[ast.operator], str]] = DSLBuilder._binop_map + _cmp_map: ClassVar[dict[type[ast.cmpop], str]] = DSLBuilder._cmp_map def __init__(self, max_unroll: int = 64): self._env: dict[str, str] = {} diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index c6893686..63d656bc 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -24,6 +24,7 @@ import textwrap import threading from abc import ABC, abstractmethod, abstractproperty +from collections.abc import MutableMapping from dataclasses import asdict from enum import Enum from pathlib import Path @@ -42,22 +43,29 @@ import blosc2 -from .dsl_kernel import DSLKernel, DSLSyntaxError, _DSLValidator, specialize_miniexpr_inputs +from .b2objects import ( + encode_b2object_payload, + make_b2object_carrier, + read_b2object_user_vlmeta, + write_b2object_payload, + write_b2object_user_vlmeta, +) +from .dsl_kernel import DSLKernel, DSLSyntaxError, DSLValidator, specialize_miniexpr_inputs if blosc2._HAS_NUMBA: import numba + from blosc2 import compute_chunks_blocks from blosc2.info import InfoReporter -from .proxy import _convert_dtype +from .proxy import convert_dtype from .utils import ( - _format_expr_scalar, - _get_chunk_operands, - _sliced_chunk_iter, check_smaller_shape, compute_smaller_slice, constructors, elementwise_funcs, + format_expr_scalar, + get_chunk_operands, get_chunks_idx, get_intersecting_chunks, infer_shape, @@ -68,6 +76,7 @@ process_key, reducers, safe_numpy_globals, + sliced_chunk_iter, try_miniexpr, ) @@ -275,7 +284,7 @@ def get_expr_globals(expression): if not hasattr(enum, "member"): # copy-pasted from Lib/enum.py - class _mymember: + class MyMember: """ Forces item to become an Enum member during class creation. """ @@ -283,7 +292,7 @@ class _mymember: def __init__(self, value): self.value = value else: - _mymember = enum.member # only available after python 3.11 + MyMember = enum.member # only available after python 3.11 class ReduceOp(Enum): @@ -293,25 +302,25 @@ class ReduceOp(Enum): # wrap as enum.member so that Python doesn't treat some funcs # as class methods (rather than Enum members) - SUM = _mymember(np.add) - PROD = _mymember(np.multiply) - MEAN = _mymember(np.mean) - STD = _mymember(np.std) - VAR = _mymember(np.var) + SUM = MyMember(np.add) + PROD = MyMember(np.multiply) + MEAN = MyMember(np.mean) + STD = MyMember(np.std) + VAR = MyMember(np.var) # Computing a median from partial results is not straightforward because the median # is a positional statistic, which means it depends on the relative ordering of all # the data points. Unlike statistics such as the sum or mean, you can't compute a median # from partial results without knowing the entire dataset, and this is way too expensive # for arrays that cannot typically fit in-memory (e.g. disk-based NDArray). # MEDIAN = np.median - MAX = _mymember(np.maximum) - MIN = _mymember(np.minimum) - ANY = _mymember(np.any) - ALL = _mymember(np.all) - ARGMAX = _mymember(np.argmax) - ARGMIN = _mymember(np.argmin) - CUMULATIVE_SUM = _mymember(npcumsum) - CUMULATIVE_PROD = _mymember(npcumprod) + MAX = MyMember(np.maximum) + MIN = MyMember(np.minimum) + ANY = MyMember(np.any) + ALL = MyMember(np.all) + ARGMAX = MyMember(np.argmax) + ARGMIN = MyMember(np.argmin) + CUMULATIVE_SUM = MyMember(npcumsum) + CUMULATIVE_PROD = MyMember(npcumprod) class LazyArrayEnum(Enum): @@ -323,7 +332,64 @@ class LazyArrayEnum(Enum): UDF = 1 +class LazyArrayVLMeta(MutableMapping): + """User metadata attached to a LazyArray.""" + + def __init__(self, lazyarr: LazyArray): + self.lazyarr = lazyarr + + def __getitem__(self, key): + return self.lazyarr._get_user_vlmeta()[key] + + def __setitem__(self, key, value): + data = self.lazyarr._get_user_vlmeta() + data[key] = value + self.lazyarr._sync_user_vlmeta() + + def __delitem__(self, key): + data = self.lazyarr._get_user_vlmeta() + del data[key] + self.lazyarr._sync_user_vlmeta() + + def __iter__(self): + return iter(self.lazyarr._get_user_vlmeta()) + + def __len__(self): + return len(self.lazyarr._get_user_vlmeta()) + + def getall(self): + return self.lazyarr._get_user_vlmeta().copy() + + def __repr__(self): + return repr(self.getall()) + + def __str__(self): + return str(self.getall()) + + class LazyArray(ABC, blosc2.Operand): + def _get_user_vlmeta(self) -> dict[str, Any]: + if not hasattr(self, "_vlmeta_user"): + self._vlmeta_user = {} + return self._vlmeta_user + + def _set_user_vlmeta(self, metadata: dict[str, Any], *, sync: bool = True) -> None: + self._vlmeta_user = dict(metadata) + if sync: + self._sync_user_vlmeta() + + def _sync_user_vlmeta(self) -> None: + array = getattr(self, "array", None) + if array is not None: + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) + + @property + def vlmeta(self) -> LazyArrayVLMeta: + """User variable-length metadata for this LazyArray.""" + if not hasattr(self, "_vlmeta_proxy"): + self._vlmeta_proxy = LazyArrayVLMeta(self) + return self._vlmeta_proxy + @abstractmethod def indices(self, order: str | list[str] | None = None) -> blosc2.LazyArray: """ @@ -492,6 +558,9 @@ def save(self, **kwargs: Any) -> None: section for more info). * This is currently only supported for :ref:`LazyExpr` and :ref:`LazyUDF` (including kernels decorated with :func:`blosc2.dsl_kernel`). + * User metadata can be attached via :attr:`vlmeta`. For in-memory LazyArrays + this stays in memory; for persisted LazyArrays it is serialized and restored + on reopen. Examples -------- @@ -570,7 +639,7 @@ def convert_inputs(inputs): return [] inputs_ = [] for obj in inputs: - if not isinstance(obj, (np.ndarray, blosc2.Operand)) and not np.isscalar(obj): + if not isinstance(obj, np.ndarray | blosc2.Operand) and not np.isscalar(obj): try: obj = blosc2.SimpleProxy(obj) except Exception: @@ -1116,7 +1185,7 @@ async def async_read_chunks(arrs, info, queue): my_chunk_iter = range(arrs[0].schunk.nchunks) if len(info) == 5: if info[-1] is not None: - my_chunk_iter = _sliced_chunk_iter(chunks_, (), shape, axis=info[-1], nchunk=True) + my_chunk_iter = sliced_chunk_iter(chunks_, (), shape, axis=info[-1], nchunk=True) info = info[:4] for i, nchunk in enumerate(my_chunk_iter): futures = [ @@ -1290,7 +1359,7 @@ def _format_dsl_parse_error_hint(expr_text: str, backend_msg: str): line_no = expr_text.count("\n", 0, err_pos) + 1 line_start = expr_text.rfind("\n", 0, err_pos) + 1 col_no = err_pos - line_start + 1 - dump = _DSLValidator(expr_text)._format_source_with_pointer(line_no, col_no) + dump = DSLValidator(expr_text)._format_source_with_pointer(line_no, col_no) return f"Parse error location (line {line_no}, col {col_no}, offset {err_pos}):\n{dump}" @@ -1787,7 +1856,7 @@ def slices_eval( # noqa: C901 ndindex.ndindex(cslice).as_subindex(_slice).raw ) # in the case _slice=(), just gives cslice - _get_chunk_operands(operands, cslice, chunk_operands, shape) + get_chunk_operands(operands, cslice, chunk_operands, shape) if out is None: shape_ = shape_slice if shape_slice is not None else shape @@ -2336,7 +2405,7 @@ def reduce_slices( # noqa: C901 axis=reduce_args["axis"] if np.isscalar(reduce_args["axis"]) else None, ) else: - _get_chunk_operands(operands, cslice, chunk_operands, shape) + get_chunk_operands(operands, cslice, chunk_operands, shape) if reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: reduced_slice = ( @@ -2855,9 +2924,9 @@ def result_type( # Follow NumPy rules for scalar-array operations # Create small arrays with the same dtypes and let NumPy's type promotion determine the result type arrs = [ - (np.array(value).dtype if isinstance(value, (str, bytes)) else value) + (np.array(value).dtype if isinstance(value, str | bytes) else value) if (np.isscalar(value) or not hasattr(value, "dtype")) - else np.array([0], dtype=_convert_dtype(value.dtype)) + else np.array([0], dtype=convert_dtype(value.dtype)) for value in arrays_and_dtypes ] return np.result_type(*arrs) @@ -2902,7 +2971,7 @@ def __init__(self, new_op): # noqa: C901 # Check that operands are proper Operands, LazyArray or scalars; if not, convert to NDArray objects value1 = ( blosc2.SimpleProxy(value1) - if not (isinstance(value1, (blosc2.Operand, np.ndarray)) or np.isscalar(value1)) + if not (isinstance(value1, blosc2.Operand | np.ndarray) or np.isscalar(value1)) else value1 ) # Reset values represented as np.int64 etc. to be set as Python natives @@ -2926,7 +2995,7 @@ def __init__(self, new_op): # noqa: C901 return value2 = ( blosc2.SimpleProxy(value2) - if not (isinstance(value2, (blosc2.Operand, np.ndarray)) or np.isscalar(value2)) + if not (isinstance(value2, blosc2.Operand | np.ndarray) or np.isscalar(value2)) else value2 ) # Reset values represented as np.int64 etc. to be set as Python natives @@ -2944,15 +3013,15 @@ def __init__(self, new_op): # noqa: C901 elif op in funcs_2args: if np.isscalar(value1) and np.isscalar(value2): self.expression = "o0" - svalue1 = _format_expr_scalar(value1) - svalue2 = _format_expr_scalar(value2) + svalue1 = format_expr_scalar(value1) + svalue2 = format_expr_scalar(value2) self.operands = {"o0": ne_evaluate(f"{op}({svalue1}, {svalue2})")} # eager evaluation elif np.isscalar(value2): self.operands = {"o0": value1} - self.expression = f"{op}(o0, {_format_expr_scalar(value2)})" + self.expression = f"{op}(o0, {format_expr_scalar(value2)})" elif np.isscalar(value1): self.operands = {"o0": value2} - self.expression = f"{op}({_format_expr_scalar(value1)}, o0)" + self.expression = f"{op}({format_expr_scalar(value1)}, o0)" else: self.operands = {"o0": value1, "o1": value2} self.expression = f"{op}(o0, o1)" @@ -3026,9 +3095,9 @@ def update_expr(self, new_op): # noqa: C901 def_operands = value1.operands elif isinstance(value1, LazyExpr): if np.isscalar(value2): - v2 = _format_expr_scalar(value2) + v2 = format_expr_scalar(value2) elif hasattr(value2, "shape") and value2.shape == (): - v2 = _format_expr_scalar(value2[()]) + v2 = format_expr_scalar(value2[()]) else: operand_to_key = {id(v): k for k, v in value1.operands.items()} try: @@ -3047,9 +3116,9 @@ def update_expr(self, new_op): # noqa: C901 def_operands = value1.operands else: if np.isscalar(value1): - v1 = _format_expr_scalar(value1) + v1 = format_expr_scalar(value1) elif hasattr(value1, "shape") and value1.shape == (): - v1 = _format_expr_scalar(value1[()]) + v1 = format_expr_scalar(value1[()]) else: operand_to_key = {id(v): k for k, v in value2.operands.items()} try: @@ -3711,44 +3780,59 @@ def save(self, urlpath=None, **kwargs): if urlpath is None: raise ValueError("To save a LazyArray you must provide an urlpath") - expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression - operands_ = self.operands_tosave if hasattr(self, "operands_tosave") else self.operands - # Validate expression - validate_expr(expression) - - meta = kwargs.get("meta", {}) - meta["LazyArray"] = LazyArrayEnum.Expr.value kwargs["urlpath"] = urlpath - kwargs["meta"] = meta kwargs["mode"] = "w" # always overwrite the file in urlpath + self._to_b2object_carrier(**kwargs) - # Create an empty array; useful for providing the shape and dtype of the outcome - array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) + def to_cframe(self) -> bytes: + return self._to_b2object_carrier().to_cframe() - # Save the expression and operands in the metadata - operands = {} + def _to_b2object_carrier(self, **kwargs): + expression = self.expression_tosave if hasattr(self, "expression_tosave") else self.expression + operands_ = self.operands_tosave if hasattr(self, "operands_tosave") else self.operands + validate_expr(expression) + + payload = {"kind": "lazyexpr", "version": 1, "expression": expression, "operands": {}} + carrier_urlpath = kwargs.get("urlpath") + carrier_parent = Path(carrier_urlpath).parent if carrier_urlpath is not None else None for key, value in operands_.items(): if isinstance(value, blosc2.C2Array): - operands[key] = { - "path": str(value.path), - "urlbase": value.urlbase, - } + payload["operands"][key] = encode_b2object_payload(value) continue if isinstance(value, blosc2.Proxy): - # Take the required info from the Proxy._cache container value = value._cache + ref = getattr(value, "_blosc2_ref", None) + if isinstance(ref, blosc2.Ref): + payload["operands"][key] = ref.to_dict() + continue if not hasattr(value, "schunk"): raise ValueError( "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" ) if value.schunk.urlpath is None: raise ValueError("To save a LazyArray, all operands must be stored on disk/network") - operands[key] = value.schunk.urlpath - array.schunk.vlmeta["_LazyArray"] = { - "expression": expression, - "UDF": None, - "operands": operands, - } + operand_urlpath = Path(value.schunk.urlpath) + if carrier_parent is not None and not operand_urlpath.is_absolute(): + ref_urlpath = operand_urlpath.as_posix() + elif carrier_parent is not None: + try: + ref_urlpath = operand_urlpath.relative_to(carrier_parent).as_posix() + except ValueError: + ref_urlpath = operand_urlpath.as_posix() + else: + ref_urlpath = operand_urlpath.as_posix() + payload["operands"][key] = {"kind": "urlpath", "version": 1, "urlpath": ref_urlpath} + array = make_b2object_carrier( + "lazyexpr", + self.shape, + self.dtype, + chunks=self.chunks, + blocks=self.blocks, + **kwargs, + ) + write_b2object_payload(array, payload) + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) + return array @classmethod def _new_expr(cls, expression, operands, guess, out=None, where=None, ne_args=None): @@ -3764,7 +3848,7 @@ def _new_expr(cls, expression, operands, guess, out=None, where=None, ne_args=No _operands = operands | local_vars # Check that operands are proper Operands, LazyArray or scalars; if not, convert to NDArray objects for op, val in _operands.items(): - if not (isinstance(val, (blosc2.Operand, np.ndarray)) or np.isscalar(val)): + if not (isinstance(val, blosc2.Operand | np.ndarray) or np.isscalar(val)): _operands[op] = blosc2.SimpleProxy(val) # for scalars just return value (internally converts to () if necessary) opshapes = {k: v if not hasattr(v, "shape") else v.shape for k, v in _operands.items()} @@ -4060,50 +4144,93 @@ def save(self, urlpath=None, **kwargs): if urlpath is None: raise ValueError("To save a LazyArray you must provide an urlpath") - meta = kwargs.get("meta", {}) - meta["LazyArray"] = LazyArrayEnum.UDF.value kwargs["urlpath"] = urlpath - kwargs["meta"] = meta kwargs["mode"] = "w" # always overwrite the file in urlpath + try: + self._to_b2object_carrier(**kwargs) + except (TypeError, ValueError): + meta = kwargs.get("meta", {}) + meta["LazyArray"] = LazyArrayEnum.UDF.value + kwargs["meta"] = meta - # Create an empty array; useful for providing the shape and dtype of the outcome - array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) + # Create an empty array; useful for providing the shape and dtype of the outcome + array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) - # Save the expression and operands in the metadata - operands = {} - operands_ = self.inputs_dict - for i, (_key, value) in enumerate(operands_.items()): - pos_key = f"o{i}" # always use positional keys for consistent loading - if isinstance(value, blosc2.C2Array): - operands[pos_key] = { - "path": str(value.path), - "urlbase": value.urlbase, - } + # Save the expression and operands in the metadata + operands = {} + operands_ = self.inputs_dict + for i, (_key, value) in enumerate(operands_.items()): + pos_key = f"o{i}" # always use positional keys for consistent loading + if isinstance(value, blosc2.C2Array): + operands[pos_key] = { + "path": str(value.path), + "urlbase": value.urlbase, + } + continue + if isinstance(value, blosc2.Proxy): + # Take the required info from the Proxy._cache container + value = value._cache + if not hasattr(value, "schunk"): + raise ValueError( + "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" + ) from None + if value.schunk.urlpath is None: + raise ValueError( + "To save a LazyArray, all operands must be stored on disk/network" + ) from None + operands[pos_key] = value.schunk.urlpath + udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func + udf_name = getattr(udf_func, "__name__", self.func.__name__) + try: + udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip() + except Exception: + udf_source = None + meta = { + "UDF": udf_source, + "operands": operands, + "name": udf_name, + } + if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None: + meta["dsl_source"] = self.func.dsl_source + array.schunk.vlmeta["_LazyArray"] = meta + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) + + def to_cframe(self) -> bytes: + return self._to_b2object_carrier().to_cframe() + + def _to_b2object_carrier(self, **kwargs): + payload = encode_b2object_payload(self) + if payload is None: + raise TypeError("Persistent Blosc2 object payload is not supported for this LazyUDF") + + carrier_urlpath = kwargs.get("urlpath") + carrier_parent = Path(carrier_urlpath).parent if carrier_urlpath is not None else None + for operand_payload in payload["operands"].values(): + if operand_payload["kind"] not in {"urlpath", "dictstore_key"}: continue - if isinstance(value, blosc2.Proxy): - # Take the required info from the Proxy._cache container - value = value._cache - if not hasattr(value, "schunk"): - raise ValueError( - "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" - ) - if value.schunk.urlpath is None: - raise ValueError("To save a LazyArray, all operands must be stored on disk/network") - operands[pos_key] = value.schunk.urlpath - udf_func = self.func.func if isinstance(self.func, DSLKernel) else self.func - udf_name = getattr(udf_func, "__name__", self.func.__name__) - try: - udf_source = textwrap.dedent(inspect.getsource(udf_func)).lstrip() - except Exception: - udf_source = None - meta = { - "UDF": udf_source, - "operands": operands, - "name": udf_name, - } - if isinstance(self.func, DSLKernel) and self.func.dsl_source is not None: - meta["dsl_source"] = self.func.dsl_source - array.schunk.vlmeta["_LazyArray"] = meta + operand_urlpath = Path(operand_payload["urlpath"]) + if carrier_parent is not None and not operand_urlpath.is_absolute(): + ref_urlpath = operand_urlpath.as_posix() + elif carrier_parent is not None: + try: + ref_urlpath = operand_urlpath.relative_to(carrier_parent).as_posix() + except ValueError: + ref_urlpath = operand_urlpath.as_posix() + else: + ref_urlpath = operand_urlpath.as_posix() + operand_payload["urlpath"] = ref_urlpath + + array = make_b2object_carrier( + "lazyudf", + self.shape, + self.dtype, + chunks=self.chunks, + blocks=self.blocks, + **kwargs, + ) + write_b2object_payload(array, payload) + write_b2object_user_vlmeta(array, self._get_user_vlmeta()) + return array def _numpy_eval_expr(expression, operands, prefer_blosc=False): @@ -4413,7 +4540,7 @@ def _reconstruct_lazyudf(expr, lazyarray, operands_dict, array): ) -def _open_lazyarray(array): +def open_lazyarray(array): value = array.schunk.meta["LazyArray"] lazyarray = array.schunk.vlmeta["_LazyArray"] if value == LazyArrayEnum.Expr.value: @@ -4461,6 +4588,7 @@ def _open_lazyarray(array): new_expr.array = array # We want to expose schunk too, so that .info() can be used on the LazyArray new_expr.schunk = array.schunk + new_expr._set_user_vlmeta(read_b2object_user_vlmeta(array), sync=False) return new_expr diff --git a/src/blosc2/msgpack_utils.py b/src/blosc2/msgpack_utils.py new file mode 100644 index 00000000..0dba1d9d --- /dev/null +++ b/src/blosc2/msgpack_utils.py @@ -0,0 +1,92 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +from msgpack import ExtType, packb, unpackb + +from blosc2 import blosc2_ext +from blosc2.b2objects import decode_b2object_payload, encode_b2object_payload +from blosc2.ref import Ref + +# Msgpack extension type codes are application-defined. Reserve code 42 in +# python-blosc2 for values serialized as Blosc2 CFrames via ``to_cframe()`` and +# reconstructed with ``blosc2.from_cframe()``. Keep this stable for backward +# compatibility with persisted msgpack payloads produced by this package. +_BLOSC2_EXT_CODE = 42 +# Reserve code 43 for structured Blosc2 reference objects that are not naturally +# serialized as CFrames. The payload is a msgpack-encoded mapping with a +# stable ``kind`` and ``version`` envelope. +_BLOSC2_STRUCTURED_EXT_CODE = 43 +_BLOSC2_STRUCTURED_VERSION = 1 + + +def _encode_structured_reference(obj): + import blosc2 + + if isinstance(obj, blosc2.Ref): + payload = {"kind": "ref", "version": _BLOSC2_STRUCTURED_VERSION, "ref": obj.to_dict()} + return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) + payload = encode_b2object_payload(obj) + if payload is not None: + return ExtType(_BLOSC2_STRUCTURED_EXT_CODE, packb(payload, use_bin_type=True)) + return None + + +def _decode_structured_reference(data): + payload = unpackb(data) + if not isinstance(payload, dict): + raise TypeError("Structured Blosc2 msgpack payload must decode to a mapping") + + version = payload.get("version") + if version != _BLOSC2_STRUCTURED_VERSION: + raise ValueError(f"Unsupported structured Blosc2 msgpack payload version: {version!r}") + + kind = payload.get("kind") + if kind == "ref": + ref_payload = payload.get("ref") + return Ref.from_dict(ref_payload) + if kind in {"c2array", "lazyexpr", "lazyudf"}: + return decode_b2object_payload(payload) + raise ValueError(f"Unsupported structured Blosc2 msgpack payload kind: {kind!r}") + + +def _encode_msgpack_ext(obj): + import blosc2 + + if isinstance( + obj, blosc2.NDArray | blosc2.SChunk | blosc2.VLArray | blosc2.BatchStore | blosc2.EmbedStore + ): + return ExtType(_BLOSC2_EXT_CODE, obj.to_cframe()) + structured = _encode_structured_reference(obj) + if structured is not None: + return structured + return blosc2_ext.encode_tuple(obj) + + +def msgpack_packb(value): + return packb(value, default=_encode_msgpack_ext, strict_types=True, use_bin_type=True) + + +def decode_tuple_list_hook(obj): + if obj and isinstance(obj[0], str) and obj[0] == "__tuple__": + return tuple(obj[1:]) + return obj + + +def _decode_msgpack_ext(code, data): + import blosc2 + + if code == _BLOSC2_EXT_CODE: + return blosc2.from_cframe(data, copy=True) + if code == _BLOSC2_STRUCTURED_EXT_CODE: + return _decode_structured_reference(data) + return ExtType(code, data) + + +def msgpack_unpackb(payload): + return unpackb(payload, list_hook=decode_tuple_list_hook, ext_hook=_decode_msgpack_ext) diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 4c35cef6..c972e03a 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -34,10 +34,10 @@ from .linalg import matmul from .utils import ( - _get_local_slice, - _get_selection, - _incomplete_lazyfunc, get_chunks_idx, + get_local_slice, + get_selection, + incomplete_lazyfunc, npbinvert, nplshift, nprshift, @@ -157,7 +157,7 @@ def is_inside_new_expr() -> bool: """ # Get the current call stack stack = inspect.stack() - return builtins.any(frame_info.function in {"_new_expr", "_open_lazyarray"} for frame_info in stack) + return builtins.any(frame_info.function in {"_new_expr", "open_lazyarray"} for frame_info in stack) def make_key_hashable(key): @@ -1834,7 +1834,7 @@ def imag(ndarr: blosc2.Array, /) -> blosc2.LazyExpr: return blosc2.LazyExpr(new_op=(ndarr, "imag", None)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def contains(ndarr: blosc2.Array, value: str | bytes | blosc2.Array, /) -> blosc2.LazyExpr: """ Check if the array contains a specified value. @@ -3060,7 +3060,7 @@ def remainder( return blosc2.LazyExpr(new_op=(x1, "%", x2)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def clip( x: blosc2.Array, min: int | float | blosc2.Array | None = None, @@ -3102,7 +3102,7 @@ def chunkwise_clip(inputs, output, offset): return blosc2.lazyudf(chunkwise_clip, (x, min, max), dtype=dtype, shape=shape, **kwargs) -@_incomplete_lazyfunc +@incomplete_lazyfunc def logaddexp(x1: int | float | blosc2.Array, x2: int | float | blosc2.Array, **kwargs: Any) -> NDArray: """ Calculates the logarithm of the sum of exponentiations log(exp(x1) + exp(x2)) for @@ -4036,7 +4036,7 @@ def get_fselection_numpy(self, key: list | np.ndarray) -> np.ndarray: # now all indices are slices or arrays of integers (or booleans) # # moreover, all arrays are consecutive (otherwise an error is raised) - if np.all([isinstance(s, (slice, np.ndarray)) for s in _slice]) and np.all( + if np.all([isinstance(s, slice | np.ndarray) for s in _slice]) and np.all( [s.dtype is not bool for s in _slice if isinstance(s, np.ndarray)] ): chunks = np.array(chunks) @@ -4123,14 +4123,14 @@ def get_fselection_numpy(self, key: list | np.ndarray) -> np.ndarray: # loop over chunks coming from slices before and after array indices for cprior_tuple in product(*cprior_slices): - out_prior_selection, prior_selection, loc_prior_selection = _get_selection( + out_prior_selection, prior_selection, loc_prior_selection = get_selection( cprior_tuple, prior_tuple, chunks[:begin] ) for cpost_tuple in product(*cpost_slices): - out_post_selection, post_selection, loc_post_selection = _get_selection( + out_post_selection, post_selection, loc_post_selection = get_selection( cpost_tuple, post_tuple, chunks[end:] ) - locbegin, locend = _get_local_slice( + locbegin, locend = get_local_slice( prior_selection, post_selection, (chunk_begin, chunk_end) ) to_be_loaded = np.empty(locend - locbegin, dtype=self.dtype) @@ -4202,10 +4202,10 @@ def _get_set_nonunit_steps(self, _slice, out=None, value=None): slice_to_chunktuple(s, c) for s, c in zip(_slice, chunks, strict=True) ] # internally handles negative steps for c in product(*intersecting_chunks): - sel_idx, glob_selection, sub_idx = _get_selection(c, _slice, chunks) + sel_idx, glob_selection, sub_idx = get_selection(c, _slice, chunks) sel_idx = tuple(s for s, m in zip(sel_idx, mask, strict=True) if not m) sub_idx = tuple(s if not m else s.start for s, m in zip(sub_idx, mask, strict=True)) - locstart, locstop = _get_local_slice( + locstart, locstop = get_local_slice( glob_selection, (), ((), ()), # switches start and stop for negative steps @@ -4298,8 +4298,8 @@ def __getitem__( key = key[()] if hasattr(key, "shape") and key.shape == () else key # convert to scalar # fancy indexing - if isinstance(key_, (list, np.ndarray)) or builtins.any( - isinstance(k, (list, np.ndarray)) for k in key_ + if isinstance(key_, list | np.ndarray) or builtins.any( + isinstance(k, list | np.ndarray) for k in key_ ): # check scalar booleans, which add 1 dim to beginning if np.issubdtype(type(key), bool) and np.isscalar(key): @@ -4397,7 +4397,7 @@ def __setitem__( value if np.isscalar(value) else blosc2.as_simpleproxy(value) ) # convert to SimpleProxy for e.g. JAX, Tensorflow, PyTorch - if builtins.any(isinstance(k, (list, np.ndarray)) for k in key_): # fancy indexing + if builtins.any(isinstance(k, list | np.ndarray) for k in key_): # fancy indexing _slice = ndindex.ndindex(key_).expand( self.shape ) # handles negative indices -> positive internally @@ -4969,7 +4969,7 @@ def where( return condition.where(x, y) -@_incomplete_lazyfunc +@incomplete_lazyfunc def startswith( a: str | blosc2.Array, prefix: str | blosc2.Array ) -> NDArray: # start: int = 0, end: int | None = None, **kwargs) @@ -5009,7 +5009,7 @@ def startswith( return blosc2.LazyExpr(new_op=(a, "startswith", prefix)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def endswith( a: str | blosc2.Array, suffix: str | blosc2.Array ) -> NDArray: # start: int = 0, end: int | None = None, **kwargs) -> NDArray: @@ -5048,7 +5048,7 @@ def endswith( return blosc2.LazyExpr(new_op=(a, "endswith", suffix)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def lower(a: str | blosc2.Array) -> NDArray: """ Copy-pasted from numpy documentation: https://numpy.org/doc/stable/reference/generated/numpy.char.lower.html @@ -5072,7 +5072,7 @@ def lower(a: str | blosc2.Array) -> NDArray: return blosc2.LazyExpr(new_op=(a, "lower", None)) -@_incomplete_lazyfunc +@incomplete_lazyfunc def upper(a: str | blosc2.Array) -> NDArray: """ Copy-pasted from numpy documentation: https://numpy.org/doc/stable/reference/generated/numpy.char.upper.html @@ -6003,7 +6003,7 @@ def asarray(array: Sequence | blosc2.Array, copy: bool | None = None, **kwargs: raise ValueError("Only unsafe casting is supported at the moment.") if not hasattr(array, "shape"): array = np.asarray(array) # defaults if dtype=None - dtype_ = blosc2.proxy._convert_dtype(array.dtype) + dtype_ = blosc2.proxy.convert_dtype(array.dtype) dtype = kwargs.pop("dtype", dtype_) # check if dtype provided kwargs = _check_ndarray_kwargs(**kwargs) chunks = kwargs.pop("chunks", None) @@ -6013,7 +6013,7 @@ def asarray(array: Sequence | blosc2.Array, copy: bool | None = None, **kwargs: chunks = array.chunks # Zarr adds a .blocks property that maps to a zarr.indexing.BlockIndex object # Let's avoid this - if blocks is None and hasattr(array, "blocks") and isinstance(array.blocks, (tuple, list)): + if blocks is None and hasattr(array, "blocks") and isinstance(array.blocks, tuple | list): blocks = array.blocks copy = True if copy is None and not isinstance(array, NDArray) else copy @@ -6572,7 +6572,7 @@ def take(x: blosc2.Array, indices: blosc2.Array, axis: int | None = None) -> NDA raise ValueError("Must specify axis parameter if x is not 1D.") if axis < 0: axis += x.ndim - if not isinstance(axis, (int, np.integer)): + if not isinstance(axis, int | np.integer): raise ValueError("Axis must be integer.") if isinstance(indices, list): indices = np.asarray(indices) @@ -6604,7 +6604,7 @@ def take_along_axis(x: blosc2.Array, indices: blosc2.Array, axis: int = -1) -> N out: NDArray Selected indices of x. """ - if not isinstance(axis, (int, np.integer)): + if not isinstance(axis, int | np.integer): raise ValueError("Axis must be integer.") if indices.ndim != x.ndim: raise ValueError("Indices must have same dimensions as x.") diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index 923cd9f6..8b946428 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -259,6 +259,8 @@ def __init__( ) self._cache.fill_special(self.src.nbytes // self.src.typesize, blosc2.SpecialValue.UNINIT) self._schunk_cache = getattr(self._cache, "schunk", self._cache) + if self.urlpath is None: + self.urlpath = getattr(self._schunk_cache, "urlpath", None) vlmeta = kwargs.get("vlmeta") if vlmeta: for key in vlmeta: @@ -575,7 +577,7 @@ def __getitem__(self, item: slice | list[slice]) -> np.ndarray: return nparr[self.field] -def _convert_dtype(dt: str | DTypeLike): +def convert_dtype(dt: str | DTypeLike): """ Attempts to convert to blosc2.dtype (i.e. numpy dtype) """ @@ -617,14 +619,14 @@ def __init__(self, src, chunks: tuple | None = None, blocks: tuple | None = None if not hasattr(src, "__getitem__"): raise TypeError("The source must have a __getitem__ method") self._src = src - self._dtype = _convert_dtype(src.dtype) + self._dtype = convert_dtype(src.dtype) self._shape = src.shape if isinstance(src.shape, tuple) else tuple(src.shape) # Compute reasonable values for chunks and blocks cparams = blosc2.CParams(clevel=0) def is_ints_sequence(src, attr): seq = getattr(src, attr, None) - if not isinstance(seq, Sequence) or isinstance(seq, (str, bytes)): + if not isinstance(seq, Sequence) or isinstance(seq, str | bytes): return False return all(isinstance(x, int) for x in seq) diff --git a/src/blosc2/ref.py b/src/blosc2/ref.py index 9534d271..c0acca8e 100644 --- a/src/blosc2/ref.py +++ b/src/blosc2/ref.py @@ -118,7 +118,7 @@ def open(self): import blosc2 if self.kind == "urlpath": - return blosc2.open(self.urlpath, mode="r") + return blosc2.open(self.urlpath) if self.kind == "dictstore_key": return blosc2.DictStore(self.urlpath, mode="r")[self.key] if self.kind == "c2array": diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index 95c9df72..956f41d2 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -19,8 +19,8 @@ import blosc2 from blosc2 import SpecialValue, blosc2_ext -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb from blosc2.info import InfoReporter, format_nbytes_info +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb class vlmeta(MutableMapping, blosc2_ext.vlmeta): @@ -1621,7 +1621,7 @@ def _set_default_dparams(kwargs): kwargs["dparams"] = dparams -def _process_opened_object(res): +def process_opened_object(res): meta = getattr(res, "schunk", res).meta if "proxy-source" in meta: proxy_src = meta["proxy-source"] @@ -1634,6 +1634,9 @@ def _process_opened_object(res): elif not proxy_src["caterva2_env"]: raise RuntimeError("Could not find the source when opening a Proxy") + if "b2o" in meta: + return blosc2.open_b2object(res) + if "vlarray" in meta: from blosc2.vlarray import VLArray @@ -1645,7 +1648,7 @@ def _process_opened_object(res): return BatchStore(_from_schunk=getattr(res, "schunk", res)) if isinstance(res, blosc2.NDArray) and "LazyArray" in res.schunk.meta: - return blosc2._open_lazyarray(res) + return blosc2.open_lazyarray(res) else: return res @@ -1778,4 +1781,4 @@ def open( _set_default_dparams(kwargs) res = blosc2_ext.open(urlpath, mode, offset, **kwargs) - return _process_opened_object(res) + return process_opened_object(res) diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 67434a3a..9ecbce58 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -80,7 +80,7 @@ def _string_endswith(a, b): return np.char.endswith(a, b) -def _format_expr_scalar(value): +def format_expr_scalar(value): if isinstance(value, np.generic): value = value.item() if isinstance(value, str | bytes): @@ -708,7 +708,7 @@ def _lookup_value(self, node): # noqa : C901 # handle negative constants like -1 if isinstance(node.op, ast.USub): val = self._lookup_value(node.operand) - if isinstance(val, (int, float)): + if isinstance(val, int | float): return -val # handle + (USub) if needed if isinstance(node.op, ast.UAdd): @@ -775,7 +775,7 @@ def slice_to_chunktuple(s, n): return range(start // n, ceiling(stop, n)) -def _get_selection(ctuple, ptuple, chunks): +def get_selection(ctuple, ptuple, chunks): # we assume that at least one element of chunk intersects with the slice # (as a consequence of only looping over intersecting chunks) # ptuple is global slice, ctuple is chunk coords (in units of chunks) @@ -841,7 +841,7 @@ def _get_selection(ctuple, ptuple, chunks): return out_pselection, pselection, loc_selection -def _get_local_slice(prior_selection, post_selection, chunk_bounds): +def get_local_slice(prior_selection, post_selection, chunk_bounds): chunk_begin, chunk_end = chunk_bounds # +1 for negative steps as have to include start (exclude stop) locbegin = np.hstack( @@ -865,7 +865,7 @@ def _get_local_slice(prior_selection, post_selection, chunk_bounds): return locbegin, locend -def _sliced_chunk_iter(chunks, idx, shape, axis=None, nchunk=False): +def sliced_chunk_iter(chunks, idx, shape, axis=None, nchunk=False): """ If nchunk is True, retrun at iterator over the number of the chunk. """ @@ -939,7 +939,7 @@ def get_intersecting_chunks(idx, shape, chunks, axis=None): return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks # special algorithm to iterate over axis first (adapted from ndindex source) - return _sliced_chunk_iter(chunks, idx, shape, axis) + return sliced_chunk_iter(chunks, idx, shape, axis) def get_chunks_idx(shape, chunks): @@ -966,7 +966,7 @@ def is_inside_ne_evaluate() -> bool: return builtins.any(frame_info.function in {"ne_evaluate"} for frame_info in stack) -def _incomplete_lazyfunc(func) -> None: +def incomplete_lazyfunc(func) -> None: """Decorator for lazy functions with incomplete numexpr/miniexpr coverage. This function will force eager execution when called from ne_evaluate. @@ -1066,7 +1066,7 @@ def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): ) -def _get_chunk_operands(operands, cslice, chunk_operands, shape): +def get_chunk_operands(operands, cslice, chunk_operands, shape): # Get the starts and stops for the slice cslice_shape = tuple(s.stop - s.start for s in cslice) starts = [s.start if s.start is not None else 0 for s in cslice] diff --git a/src/blosc2/vlarray.py b/src/blosc2/vlarray.py index 569ffce0..4f9f3401 100644 --- a/src/blosc2/vlarray.py +++ b/src/blosc2/vlarray.py @@ -12,8 +12,8 @@ from typing import TYPE_CHECKING, Any import blosc2 -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb from blosc2.info import InfoReporter, format_nbytes_info +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb if TYPE_CHECKING: from collections.abc import Iterator diff --git a/src/blosc2/_wasm_jit.py b/src/blosc2/wasm_jit.py similarity index 98% rename from src/blosc2/_wasm_jit.py rename to src/blosc2/wasm_jit.py index 54f08410..e8361ec8 100644 --- a/src/blosc2/_wasm_jit.py +++ b/src/blosc2/wasm_jit.py @@ -602,8 +602,8 @@ def init_wasm_jit_helpers() -> bool: from . import blosc2_ext - if not hasattr(blosc2_ext, "_register_wasm_jit_helpers"): - _trace("extension does not expose _register_wasm_jit_helpers") + if not hasattr(blosc2_ext, "register_wasm_jit_helpers"): + _trace("extension does not expose register_wasm_jit_helpers") return False _inject_pyodide_runtime_handles(js) @@ -618,7 +618,7 @@ def init_wasm_jit_helpers() -> bool: instantiate_ptr, free_ptr = helper_ptrs try: - blosc2_ext._register_wasm_jit_helpers(instantiate_ptr, free_ptr) + blosc2_ext.register_wasm_jit_helpers(instantiate_ptr, free_ptr) except Exception as exc: # pragma: no cover - pyodide-specific error path _trace(f"C helper registration failed: {exc}") return False diff --git a/tests/data/legacy_lazyexpr_v1/a.b2nd b/tests/data/legacy_lazyexpr_v1/a.b2nd new file mode 100644 index 00000000..a1eaef47 Binary files /dev/null and b/tests/data/legacy_lazyexpr_v1/a.b2nd differ diff --git a/tests/data/legacy_lazyexpr_v1/b.b2nd b/tests/data/legacy_lazyexpr_v1/b.b2nd new file mode 100644 index 00000000..c19f3013 Binary files /dev/null and b/tests/data/legacy_lazyexpr_v1/b.b2nd differ diff --git a/tests/data/legacy_lazyexpr_v1/expr.b2nd b/tests/data/legacy_lazyexpr_v1/expr.b2nd new file mode 100644 index 00000000..1a941376 Binary files /dev/null and b/tests/data/legacy_lazyexpr_v1/expr.b2nd differ diff --git a/tests/data/legacy_lazyudf_v1/a.b2nd b/tests/data/legacy_lazyudf_v1/a.b2nd new file mode 100644 index 00000000..c60c13bd Binary files /dev/null and b/tests/data/legacy_lazyudf_v1/a.b2nd differ diff --git a/tests/data/legacy_lazyudf_v1/b.b2nd b/tests/data/legacy_lazyudf_v1/b.b2nd new file mode 100644 index 00000000..8bcdbccf Binary files /dev/null and b/tests/data/legacy_lazyudf_v1/b.b2nd differ diff --git a/tests/data/legacy_lazyudf_v1/expr.b2nd b/tests/data/legacy_lazyudf_v1/expr.b2nd new file mode 100644 index 00000000..6a4dd9fa Binary files /dev/null and b/tests/data/legacy_lazyudf_v1/expr.b2nd differ diff --git a/tests/ndarray/test_dsl_kernels.py b/tests/ndarray/test_dsl_kernels.py index 3c422f8d..ac37beeb 100644 --- a/tests/ndarray/test_dsl_kernels.py +++ b/tests/ndarray/test_dsl_kernels.py @@ -975,3 +975,33 @@ def test_dsl_save_input_names_match(tmp_path): assert isinstance(reloaded.func, DSLKernel) assert reloaded.func.input_names == ["x", "y"] assert list(reloaded.inputs_dict.keys()) == ["x", "y"] + + +def test_dsl_save_dictstore_operands(tmp_path): + shape = (10,) + store_path = tmp_path / "ops.b2z" + ext_a = tmp_path / "a.b2nd" + ext_b = tmp_path / "b.b2nd" + expr_path = tmp_path / "lazy.b2nd" + + a = blosc2.asarray(np.arange(shape[0], dtype=np.float64), urlpath=str(ext_a), mode="w") + b = blosc2.asarray(np.arange(shape[0], dtype=np.float64) * 2, urlpath=str(ext_b), mode="w") + with blosc2.DictStore(str(store_path), mode="w", threshold=None) as dstore: + dstore["/a"] = a + dstore["/b"] = b + + with blosc2.DictStore(str(store_path), mode="r") as dstore: + a = dstore["/a"] + b = dstore["/b"] + lazy = blosc2.lazyudf(kernel_save_simple, (a, b), dtype=np.float64) + lazy.save(urlpath=str(expr_path)) + + carrier = blosc2.open(str(expr_path), mode="r").array + assert carrier.schunk.vlmeta["b2o"]["operands"] == { + "o0": {"kind": "dictstore_key", "version": 1, "urlpath": "ops.b2z", "key": "/a"}, + "o1": {"kind": "dictstore_key", "version": 1, "urlpath": "ops.b2z", "key": "/b"}, + } + + reloaded = blosc2.open(str(expr_path), mode="r") + expected = (np.arange(shape[0], dtype=np.float64) * 3) ** 2 + np.testing.assert_allclose(reloaded.compute()[...], expected, rtol=1e-5, atol=1e-6) diff --git a/tests/ndarray/test_lazyexpr.py b/tests/ndarray/test_lazyexpr.py index 6215beb5..cf4f0f63 100644 --- a/tests/ndarray/test_lazyexpr.py +++ b/tests/ndarray/test_lazyexpr.py @@ -1721,6 +1721,76 @@ def test_missing_operator(): blosc2.remove_urlpath("expr.b2nd") +def test_save_dictstore_operands(tmp_path): + store_path = tmp_path / "operands.b2z" + ext_a = tmp_path / "a.b2nd" + ext_b = tmp_path / "b.b2nd" + expr_path = tmp_path / "expr.b2nd" + expected = np.arange(5, dtype=np.int64) * 3 + + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(ext_a), mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=str(ext_b), mode="w") + with blosc2.DictStore(str(store_path), mode="w", threshold=None) as dstore: + dstore["/a"] = a + dstore["/b"] = b + + with blosc2.DictStore(str(store_path), mode="r") as dstore: + a = dstore["/a"] + b = dstore["/b"] + expr = blosc2.lazyexpr("a + b") + expr.save(expr_path) + + carrier = blosc2.open(expr_path, mode="r").array + assert carrier.schunk.vlmeta["b2o"]["operands"] == { + "a": {"kind": "dictstore_key", "version": 1, "urlpath": str(store_path), "key": "/a"}, + "b": {"kind": "dictstore_key", "version": 1, "urlpath": str(store_path), "key": "/b"}, + } + + restored = blosc2.open(expr_path) + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], expected) + + +def test_save_proxy_operands_reopen_default_mode(tmp_path): + src_path = tmp_path / "src.b2nd" + proxy_path = tmp_path / "proxy.b2nd" + expr_path = tmp_path / "expr.b2nd" + + src = blosc2.asarray(np.arange(10, dtype=np.int64), urlpath=str(src_path), mode="w") + proxy = blosc2.Proxy(src, urlpath=str(proxy_path), mode="w") + expr = proxy + proxy + expr.save(str(expr_path)) + + restored = blosc2.open(str(expr_path)) + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(10, dtype=np.int64) * 2) + + +def test_lazyexpr_vlmeta_in_memory_and_persisted(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(tmp_path / "b.b2nd"), mode="w") + expr = a + b + + expr.vlmeta["name"] = "sum" + expr.vlmeta["config"] = {"scale": 1} + assert expr.vlmeta["name"] == "sum" + assert expr.vlmeta["config"] == {"scale": 1} + + expr_path = tmp_path / "expr_vlmeta.b2nd" + expr.save(str(expr_path)) + restored = blosc2.open(str(expr_path)) + + assert restored.vlmeta["name"] == "sum" + assert restored.vlmeta["config"] == {"scale": 1} + + restored.vlmeta["note"] = "persisted" + reopened = blosc2.open(str(expr_path)) + assert reopened.vlmeta["note"] == "persisted" + np.testing.assert_array_equal(reopened[:], np.arange(5, dtype=np.int64) * 2) + + # Test the chaining of multiple lazy expressions def test_chain_expressions(): N = 1_000 @@ -1843,12 +1913,8 @@ def test_to_cframe(): dtype = "float64" a = blosc2.linspace(0, 1, N * N, dtype=dtype, shape=(N, N)) expr = a**3 + blosc2.sin(a**2) - cframe = expr.to_cframe() - assert len(cframe) > 0 - arr = blosc2.ndarray_from_cframe(cframe) - assert arr.shape == expr.shape - assert arr.dtype == expr.dtype - assert np.allclose(arr[:], expr[:]) + with pytest.raises(ValueError, match="stored on disk/network"): + expr.to_cframe() # Test for the bug where multiplying two complex lazy expressions would fail with: diff --git a/tests/ndarray/test_lazyudf.py b/tests/ndarray/test_lazyudf.py index f2b164f5..3aa76cb6 100644 --- a/tests/ndarray/test_lazyudf.py +++ b/tests/ndarray/test_lazyudf.py @@ -501,6 +501,23 @@ def test_save_ludf(): blosc2.remove_urlpath(urlpath) +def test_lazyudf_vlmeta_roundtrip(tmp_path): + a_path = tmp_path / "a.b2nd" + expr_path = tmp_path / "lazyudf_vlmeta.b2nd" + array = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=str(a_path), mode="w") + expr = blosc2.lazyudf(udf1p, (array,), np.float64) + + expr.vlmeta["name"] = "increment" + expr.vlmeta["attrs"] = {"version": 1} + expr.save(urlpath=str(expr_path)) + + restored = blosc2.open(str(expr_path)) + + assert isinstance(restored, blosc2.LazyUDF) + assert restored.vlmeta["name"] == "increment" + assert restored.vlmeta["attrs"] == {"version": 1} + + # Test get_chunk method def test_get_chunk(): a = blosc2.linspace(0, 100, 100, shape=(10, 10), chunks=(3, 4), blocks=(2, 3)) diff --git a/tests/test_b2objects.py b/tests/test_b2objects.py new file mode 100644 index 00000000..6317d34e --- /dev/null +++ b/tests/test_b2objects.py @@ -0,0 +1,209 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +from pathlib import Path + +import numpy as np + +import blosc2 +import blosc2.c2array as blosc2_c2array + + +@blosc2.dsl_kernel +def kernel_add_square(x, y): + return x * x + y * y + 2 * x * y + + +def _make_c2array( + monkeypatch, + path="@public/examples/ds-1d.b2nd", + urlbase="https://cat2.cloud/demo/", + auth_token=None, + shape=(5,), + chunks=(5,), + blocks=(5,), + dtype=np.int64, +): + dtype = np.dtype(dtype) + + def fake_info(path_, urlbase_, params=None, headers=None, model=None, auth_token=None): + return { + "shape": list(shape), + "chunks": list(chunks), + "blocks": list(blocks), + "dtype": np.dtype(dtype).str, + "schunk": { + "cparams": dict(blosc2.cparams_dflts), + "nbytes": int(np.prod(shape)) * np.dtype(dtype).itemsize, + "cbytes": int(np.prod(shape)) * np.dtype(dtype).itemsize, + "cratio": 1.0, + "blocksize": int(np.prod(blocks)) * np.dtype(dtype).itemsize, + "vlmeta": {}, + }, + } + + monkeypatch.setattr(blosc2_c2array, "info", fake_info) + return blosc2.C2Array(path, urlbase=urlbase, auth_token=auth_token) + + +def test_c2array_from_cframe_roundtrip(monkeypatch): + original = _make_c2array(monkeypatch, auth_token="secret-token") + carrier = blosc2.ndarray_from_cframe(original.to_cframe()) + + assert carrier.schunk.meta["b2o"] == {"kind": "c2array", "version": 1} + assert carrier.schunk.vlmeta["b2o"] == { + "kind": "c2array", + "version": 1, + "path": original.path, + "urlbase": original.urlbase, + } + + restored = blosc2.from_cframe(original.to_cframe()) + + assert isinstance(restored, blosc2.C2Array) + assert restored.path == original.path + assert restored.urlbase == original.urlbase + assert restored.auth_token is None + assert restored.shape == original.shape + assert restored.dtype == original.dtype + + +def test_c2array_open_roundtrip(tmp_path, monkeypatch): + original = _make_c2array(monkeypatch, shape=(8,), chunks=(4,), blocks=(2,)) + urlpath = tmp_path / "remote-array.b2nd" + + original.save(urlpath) + restored = blosc2.open(urlpath, mode="r") + + assert isinstance(restored, blosc2.C2Array) + assert restored.path == original.path + assert restored.urlbase == original.urlbase + assert restored.auth_token is None + assert restored.shape == original.shape + assert restored.chunks == original.chunks + assert restored.blocks == original.blocks + + +def test_lazyexpr_from_cframe_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b}) + carrier = blosc2.ndarray_from_cframe(expr.to_cframe()) + + assert carrier.schunk.meta["b2o"] == {"kind": "lazyexpr", "version": 1} + assert carrier.schunk.vlmeta["b2o"] == { + "kind": "lazyexpr", + "version": 1, + "expression": "a + b", + "operands": { + "a": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "a.b2nd").as_posix()}, + "b": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "b.b2nd").as_posix()}, + }, + } + + restored = blosc2.from_cframe(expr.to_cframe()) + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3) + + +def test_lazyexpr_open_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.int64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.int64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b}) + urlpath = tmp_path / "expr.b2nd" + + expr.save(urlpath) + restored = blosc2.open(urlpath, mode="r") + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3) + + +def test_legacy_lazyexpr_open_backward_compat(): + fixture = Path(__file__).parent / "data" / "legacy_lazyexpr_v1" / "expr.b2nd" + + restored = blosc2.open(fixture, mode="r") + + assert isinstance(restored, blosc2.LazyExpr) + np.testing.assert_array_equal(restored[:], np.arange(5, dtype=np.int64) * 3) + + +def test_legacy_lazyudf_open_backward_compat(): + fixture = Path(__file__).parent / "data" / "legacy_lazyudf_v1" / "expr.b2nd" + + restored = blosc2.open(fixture, mode="r") + + assert isinstance(restored, blosc2.LazyUDF) + np.testing.assert_allclose(restored.compute()[:], (np.arange(5, dtype=np.float64) * 3) ** 2) + + +def test_lazyudf_from_cframe_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.float64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.float64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64) + carrier = blosc2.ndarray_from_cframe(expr.to_cframe()) + + assert carrier.schunk.meta["b2o"] == {"kind": "lazyudf", "version": 1} + payload = carrier.schunk.vlmeta["b2o"] + assert payload["kind"] == "lazyudf" + assert payload["version"] == 1 + assert payload["function_kind"] == "dsl" + assert payload["dsl_version"] == 1 + assert payload["name"] == "kernel_add_square" + assert "kernel_add_square" in payload["udf_source"] + assert payload["dtype"] == np.dtype(np.float64).str + assert payload["shape"] == [5] + assert payload["operands"] == { + "o0": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "a.b2nd").as_posix()}, + "o1": {"kind": "urlpath", "version": 1, "urlpath": (tmp_path / "b.b2nd").as_posix()}, + } + + restored = blosc2.from_cframe(expr.to_cframe()) + + assert isinstance(restored, blosc2.LazyUDF) + np.testing.assert_allclose(restored[:], (np.arange(5, dtype=np.float64) * 3) ** 2) + + +def test_lazyudf_open_roundtrip(tmp_path): + a = blosc2.asarray(np.arange(5, dtype=np.float64), urlpath=tmp_path / "a.b2nd", mode="w") + b = blosc2.asarray(np.arange(5, dtype=np.float64) * 2, urlpath=tmp_path / "b.b2nd", mode="w") + expr = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64) + urlpath = tmp_path / "expr.b2nd" + + expr.save(urlpath) + restored = blosc2.open(urlpath, mode="r") + + assert isinstance(restored, blosc2.LazyUDF) + np.testing.assert_allclose(restored[:], (np.arange(5, dtype=np.float64) * 3) ** 2) + + +def test_b2z_bundle_with_lazy_recipes_opens_read_only(tmp_path): + bundle_path = tmp_path / "bundle.b2z" + + with blosc2.DictStore(str(bundle_path), mode="w", threshold=1) as store: + store["/data/a"] = np.arange(5, dtype=np.float64) + store["/data/b"] = np.arange(5, dtype=np.float64) * 2 + + a = store["/data/a"] + b = store["/data/b"] + expr = blosc2.lazyexpr("a + b", operands={"a": a, "b": b}) + udf = blosc2.lazyudf(kernel_add_square, (a, b), dtype=np.float64, shape=a.shape) + + store["/recipes/expr"] = blosc2.ndarray_from_cframe(expr.to_cframe()) + store["/recipes/udf"] = blosc2.ndarray_from_cframe(udf.to_cframe()) + + with blosc2.open(str(bundle_path), mode="r") as store: + restored_expr = store["/recipes/expr"] + restored_udf = store["/recipes/udf"] + + assert isinstance(restored_expr, blosc2.LazyExpr) + assert isinstance(restored_udf, blosc2.LazyUDF) + np.testing.assert_allclose(restored_expr.compute()[:], np.arange(5, dtype=np.float64) * 3) + np.testing.assert_allclose(restored_udf.compute()[:], (np.arange(5, dtype=np.float64) * 3) ** 2) diff --git a/tests/test_batch_store.py b/tests/test_batch_store.py index 7c693732..ea27d809 100644 --- a/tests/test_batch_store.py +++ b/tests/test_batch_store.py @@ -10,7 +10,7 @@ import blosc2 import blosc2.c2array as blosc2_c2array -from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb +from blosc2.msgpack_utils import msgpack_packb, msgpack_unpackb @blosc2.dsl_kernel diff --git a/tests/test_dict_store.py b/tests/test_dict_store.py index 337ace30..4347eaf4 100644 --- a/tests/test_dict_store.py +++ b/tests/test_dict_store.py @@ -401,7 +401,7 @@ def test_metadata_discovery_reopens_renamed_external_vlarray(storage_type, tmp_p assert value.vlmeta["description"] == "Renamed VLArray" -def test_metadata_discovery_warns_and_skips_unsupported_blosc2_leaf(tmp_path): +def test_metadata_discovery_reopens_lazyexpr_leaf(tmp_path): path = tmp_path / "test_unsupported_lazyexpr.b2d" with DictStore(str(path), mode="w") as dstore: @@ -413,13 +413,13 @@ def test_metadata_discovery_warns_and_skips_unsupported_blosc2_leaf(tmp_path): expr_path = path / "unsupported_lazyexpr.b2nd" expr.save(str(expr_path)) - with pytest.warns( - UserWarning, match=r"Ignoring unsupported Blosc2 object.*unsupported_lazyexpr\.b2nd.*LazyExpr" - ): - dstore_read = DictStore(str(path), mode="r") + dstore_read = DictStore(str(path), mode="r") with dstore_read: - assert "/unsupported_lazyexpr" not in dstore_read + assert "/unsupported_lazyexpr" in dstore_read assert "/embedded" in dstore_read + value = dstore_read["/unsupported_lazyexpr"] + assert isinstance(value, blosc2.LazyExpr) + np.testing.assert_array_equal(value[:], np.arange(5) * 2) def _digest_value(value):