From 67c7e99d95265f4a332aa8dd7408b0e27aa7a0d0 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Tue, 3 Feb 2026 20:14:27 +0100 Subject: [PATCH 1/7] Initial attempt (need to add tests) --- src/blosc2/lazyexpr.py | 462 +++++++++++++++++++---------------------- src/blosc2/ndarray.py | 82 ++++++++ src/blosc2/utils.py | 106 ++++++++++ 3 files changed, 403 insertions(+), 247 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 6f53a249..b1ef22a9 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -49,6 +49,9 @@ from .proxy import _convert_dtype from .utils import ( NUMPY_GE_2_0, + _get_chunk_operands, + check_smaller_shape, + compute_smaller_slice, constructors, elementwise_funcs, get_chunks_idx, @@ -137,6 +140,45 @@ def ne_evaluate(expression, local_dict=None, **kwargs): return res[()] if isinstance(res, blosc2.Operand) else res +def _get_result(expression, chunk_operands, ne_args, where=None, indices=None, _order=None): + chunk_indices = None + if (expression == "o0" or expression == "(o0)") and where is None: + # We don't have an actual expression, so avoid a copy except to make contiguous (later) + return chunk_operands["o0"], None + # Apply the where condition (in result) + if where is not None and len(where) == 2: + # x = chunk_operands["_where_x"] + # y = chunk_operands["_where_y"] + # result = np.where(result, x, y) + # numexpr is a bit faster than np.where, and we can fuse operations in this case + new_expr = f"where({expression}, _where_x, _where_y)" + return ne_evaluate(new_expr, chunk_operands, **ne_args), None + + result = ne_evaluate(expression, chunk_operands, **ne_args) + if where is None: + return result, None + elif len(where) == 1: + x = chunk_operands["_where_x"] + if (indices is not None) or (_order is not None): + # Return indices only makes sense when the where condition is a tuple with one element + # and result is a boolean array + if len(x.shape) > 1: + raise ValueError("indices() and sort() only support 1D arrays") + if result.dtype != np.bool_: + raise ValueError("indices() and sort() only support bool conditions") + if _order: + # We need to cumulate all the fields in _order, as well as indices + chunk_indices = indices[result] + result = x[_order][result] + else: + chunk_indices = None + result = indices[result] + return result, chunk_indices + else: + return x[result], None + raise ValueError("The where condition must be a tuple with one or two elements") + + # Define empty ndindex tuple for function defaults NDINDEX_EMPTY_TUPLE = ndindex.Tuple() @@ -246,6 +288,8 @@ class ReduceOp(Enum): ALL = np.all ARGMAX = np.argmax ARGMIN = np.argmin + CUMULATIVE_SUM = np.cumulative_sum + CUMULATIVE_PROD = np.cumulative_prod class LazyArrayEnum(Enum): @@ -522,79 +566,6 @@ def compute_broadcast_shape(arrays): return np.broadcast_shapes(*shapes) if shapes else None -def check_smaller_shape(value_shape, shape, slice_shape, slice_): - """Check whether the shape of the value is smaller than the shape of the array. - - This follows the NumPy broadcasting rules. - """ - # slice_shape must be as long as shape - if len(slice_shape) != len(slice_): - raise ValueError("slice_shape must be as long as slice_") - no_nones_shape = tuple(sh for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) - no_nones_slice = tuple(s for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) - is_smaller_shape = any( - s > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_shape) - ) - slice_past_bounds = any( - s.stop > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_slice) - ) - return len(value_shape) < len(shape) or is_smaller_shape or slice_past_bounds - - -def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice): - smaller_slice = [] - diff_dims = len(larger_shape) - len(smaller_shape) - - for i in range(len(larger_shape)): - if i < diff_dims: - # For leading dimensions of the larger array that the smaller array doesn't have, - # we don't add anything to the smaller slice - pass - else: - # For dimensions that both arrays have, the slice for the smaller array should be - # the same as the larger array unless the smaller array's size along that dimension - # is 1, in which case we use None to indicate the full slice - if smaller_shape[i - diff_dims] != 1: - smaller_slice.append(larger_slice[i]) - else: - smaller_slice.append(slice(0, larger_shape[i])) - - return tuple(smaller_slice) - - -# A more compact version of the function above, albeit less readable -def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): - """ - Returns the slice of the smaller array that corresponds to the slice of the larger array. - """ - j_small = len(smaller_shape) - 1 - j_large = len(larger_shape) - 1 - smaller_shape_nones = [] - larger_shape_nones = [] - for s in reversed(larger_slice): - if s is None: - smaller_shape_nones.append(1) - larger_shape_nones.append(1) - else: - if j_small >= 0: - smaller_shape_nones.append(smaller_shape[j_small]) - j_small -= 1 - if j_large >= 0: - larger_shape_nones.append(larger_shape[j_large]) - j_large -= 1 - smaller_shape_nones.reverse() - larger_shape_nones.reverse() - diff_dims = len(larger_shape_nones) - len(smaller_shape_nones) - return tuple( - None - if larger_slice[i] is None - else ( - larger_slice[i] if smaller_shape_nones[i - diff_dims] != 1 else slice(0, larger_shape_nones[i]) - ) - for i in range(diff_dims, len(larger_shape_nones)) - ) - - # Define the patterns for validation validation_patterns = [ r"[\;]", # Flow control characters @@ -1592,12 +1563,12 @@ def slices_eval( # noqa: C901 ) # if _slice is (), returns all chunks for nchunk, chunk_slice in enumerate(intersecting_chunks): - # get intersection of chunk and target - if _slice != (): - cslice = step_handler(chunk_slice.raw, _slice) - else: - cslice = chunk_slice.raw - + # Check whether current cslice intersects with _slice + cslice = chunk_slice.raw + if cslice != () and _slice != (): + # get intersection of chunk and target + cslice = step_handler(cslice, _slice) + offset = tuple(s.start for s in cslice) # offset for the udf cslice_shape = tuple(s.stop - s.start for s in cslice) len_chunk = math.prod(cslice_shape) # get local index of part of out that is to be updated @@ -1605,35 +1576,7 @@ def slices_eval( # noqa: C901 ndindex.ndindex(cslice).as_subindex(_slice).raw ) # in the case _slice=(), just gives cslice - # Get the starts and stops for the slice - starts = [s.start if s.start is not None else 0 for s in cslice] - stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, cslice_shape, strict=True)] - unit_steps = np.all([s.step == 1 for s in cslice]) - - # Get the slice of each operand - for key, value in operands.items(): - if np.isscalar(value): - chunk_operands[key] = value - continue - if value.shape == (): - chunk_operands[key] = value[()] - continue - if check_smaller_shape(value.shape, shape, cslice_shape, cslice): - # We need to fetch the part of the value that broadcasts with the operand - smaller_slice = compute_smaller_slice(shape, value.shape, cslice) - chunk_operands[key] = value[smaller_slice] - continue - # If key is in operands, we can reuse the buffer - if ( - key in chunk_operands - and cslice_shape == chunk_operands[key].shape - and isinstance(value, blosc2.NDArray) - and unit_steps - ): - value.get_slice_numpy(chunk_operands[key], (starts, stops)) - continue - - chunk_operands[key] = value[cslice] + _get_chunk_operands(operands, cslice, chunk_operands, shape) if out is None: shape_ = shape_slice if shape_slice is not None else shape @@ -1664,47 +1607,16 @@ def slices_eval( # noqa: C901 result = np.empty(cslice_shape, dtype=out.dtype) # raises error if out is None # cslice should be equal to cslice_subidx # Call the udf directly and use result as the output array - offset = tuple(s.start for s in cslice) expression(tuple(chunk_operands.values()), result, offset=offset) out[cslice_subidx] = result continue - if where is None: - result = ne_evaluate(expression, chunk_operands, **ne_args) + if _indices or _order: + indices = np.arange(leninputs, leninputs + len_chunk, dtype=np.int64).reshape(cslice_shape) + leninputs += len_chunk + result, chunk_indices = _get_result(expression, chunk_operands, ne_args, where, indices, _order) else: - # Apply the where condition (in result) - if len(where) == 2: - # x = chunk_operands["_where_x"] - # y = chunk_operands["_where_y"] - # result = np.where(result, x, y) - # numexpr is a bit faster than np.where, and we can fuse operations in this case - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, chunk_operands, **ne_args) - elif len(where) == 1: - result = ne_evaluate(expression, chunk_operands, **ne_args) - if _indices or _order: - # Return indices only makes sense when the where condition is a tuple with one element - # and result is a boolean array - x = chunk_operands["_where_x"] - if len(x.shape) > 1: - raise ValueError("indices() and sort() only support 1D arrays") - if result.dtype != np.bool_: - raise ValueError("indices() and sort() only support bool conditions") - indices = np.arange(leninputs, leninputs + len_chunk, dtype=np.int64).reshape( - cslice_shape - ) - if _order: - # We need to cumulate all the fields in _order, as well as indices - chunk_indices = indices[result] - result = x[_order][result] - else: - result = indices[result] - leninputs += len_chunk - else: - x = chunk_operands["_where_x"] - result = x[result] - else: - raise ValueError("The where condition must be a tuple with one or two elements") + result, _ = _get_result(expression, chunk_operands, ne_args, where) # Enforce contiguity of result (necessary to fill the out array) # but avoid copy if already contiguous result = np.require(result, requirements="C") @@ -1811,9 +1723,9 @@ def slices_eval_getitem( shape = out.shape # compute the shape of the output array - _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice.raw) - slice_shape = ndindex.ndindex(_slice_bcast).newshape(shape) # includes dummy dimensions _slice = _slice.raw + _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice) + slice_shape = ndindex.ndindex(_slice_bcast).newshape(shape) # includes dummy dimensions # Get the slice of each operand slice_operands = {} @@ -1838,12 +1750,7 @@ def slices_eval_getitem( result = np.empty(slice_shape, dtype=dtype) expression(tuple(slice_operands.values()), result, offset=offset) else: - if where is None: - result = ne_evaluate(expression, slice_operands, **ne_args) - else: - # Apply the where condition (in result) - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, slice_operands, **ne_args) + result, _ = _get_result(expression, slice_operands, ne_args, where) if out is None: # avoid copying unnecessarily try: @@ -1863,7 +1770,7 @@ def infer_reduction_dtype(dtype, operation): my_float = np.result_type( dtype, np.float32 if dtype in (np.float32, np.complex64) else blosc2.DEFAULT_FLOAT ) - if operation in {ReduceOp.SUM, ReduceOp.PROD}: + if operation in {ReduceOp.SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: if np.issubdtype(dtype, np.bool_): return np.int64 if np.issubdtype(dtype, np.unsignedinteger): @@ -1945,7 +1852,8 @@ def reduce_slices( # noqa: C901 reduce_op = reduce_args.pop("op") reduce_op_str = reduce_args.pop("op_str", None) axis = reduce_args["axis"] - keepdims = reduce_args["keepdims"] + keepdims = reduce_args.get("keepdims", False) + include_initial = reduce_args.get("include_initial", False) dtype = reduce_args.get("dtype", None) if dtype is None: dtype = kwargs.pop("dtype", None) @@ -1977,11 +1885,17 @@ def reduce_slices( # noqa: C901 if reduce_args["axis"] is not None: # conserve as integer if was not tuple originally reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis - if keepdims: - reduced_shape = tuple(1 if i in axis else s for i, s in enumerate(shape_slice)) + if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + reduced_shape = (np.prod(shape_slice),) if reduce_args["axis"] is None else shape_slice + temp_axis = 0 if reduce_args["axis"] is None else reduce_args["axis"] + if include_initial: + reduced_shape = tuple(s + 1 if i == temp_axis else s for i, s in enumerate(shape_slice)) else: - reduced_shape = tuple(s for i, s in enumerate(shape_slice) if i not in axis) - mask_slice = mask_slice[[i for i in range(len(mask_slice)) if i not in axis]] + if keepdims: + reduced_shape = tuple(1 if i in axis else s for i, s in enumerate(shape_slice)) + else: + reduced_shape = tuple(s for i, s in enumerate(shape_slice) if i not in axis) + mask_slice = mask_slice[[i for i in range(len(mask_slice)) if i not in axis]] if out is not None and reduced_shape != out.shape: raise ValueError("Provided output shape does not match the reduced shape.") @@ -2046,7 +1960,7 @@ def reduce_slices( # noqa: C901 use_miniexpr = False # Some reductions are not supported yet in miniexpr - if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN): + if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN, ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM): use_miniexpr = False # Check whether we can use miniexpr @@ -2081,9 +1995,9 @@ def reduce_slices( # noqa: C901 nblocks = res_eval.nbytes // res_eval.blocksize # Initialize aux_reduc based on the reduction operation # Padding blocks won't be written, so initial values matter for the final reduction - if reduce_op == ReduceOp.SUM or reduce_op == ReduceOp.ANY: + if reduce_op in {ReduceOp.SUM, ReduceOp.ANY, ReduceOp.CUMULATIVE_SUM}: aux_reduc = np.zeros(nblocks, dtype=dtype) - elif reduce_op == ReduceOp.PROD or reduce_op == ReduceOp.ALL: + elif reduce_op in {ReduceOp.PROD, ReduceOp.ALL, ReduceOp.CUMULATIVE_SUM}: aux_reduc = np.ones(nblocks, dtype=dtype) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): @@ -2140,61 +2054,79 @@ def reduce_slices( # noqa: C901 out_init = False for nchunk, chunk_slice in enumerate(intersecting_chunks): + # Special case for cumulative operations with axis = None + if reduce_args["axis"] is None and reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: + # res_out_ is just None, out set to all 0s (sum) or 1s (prod) + out, res_out_ = convert_none_out(dtype, reduce_op, reduced_shape) + # reduced_shape is just one-element tuple + chunklen = out.chunks[0] if hasattr(out, "chunks") else chunks[-1] + carry = 0 + for cidx in range(0, reduced_shape[0] // chunklen): + slice_starts = np.unravel_index(cidx * chunklen, shape) + slice_stops = np.unravel_index((cidx + 1) * chunklen, shape) + cslice = tuple( + slice(start, stop) for start, stop in zip(slice_starts, slice_stops, strict=True) + ) + _get_chunk_operands(operands, cslice, chunk_operands, shape) + result, _ = _get_result(expression, chunk_operands, ne_args, where) + result = np.require(result, requirements="C") + if reduce_op == ReduceOp.CUMULATIVE_SUM: + res = np.cumulative_sum(result, axis=None) + carry + else: + res = np.cumulative_prod(result, axis=None) * carry + carry = res[-1] + out[cidx * chunklen + include_initial : (cidx + 1) * chunklen + include_initial] = res + break # skip looping over chunks + cslice = chunk_slice.raw - offset = tuple(s.start for s in cslice) # offset for the udf # Check whether current cslice intersects with _slice if cslice != () and _slice != (): # get intersection of chunk and target cslice = step_handler(cslice, _slice) - chunks_ = tuple(s.stop - s.start for s in cslice) - unit_steps = np.all([s.step == 1 for s in cslice]) - # Starts for slice + offset = tuple(s.start for s in cslice) # offset for the udf starts = [s.start if s.start is not None else 0 for s in cslice] + unit_steps = np.all([s.step == 1 for s in cslice]) + cslice_shape = tuple(s.stop - s.start for s in cslice) if _slice == () and fast_path and unit_steps: # Fast path - full_chunk = chunks_ == chunks + full_chunk = cslice_shape == chunks fill_chunk_operands( - operands, cslice, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=True + operands, + cslice, + cslice_shape, + full_chunk, + aligned, + nchunk, + iter_disk, + chunk_operands, + reduc=True, ) else: - # Get the stops for the slice - stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, chunks_, strict=True)] - # Get the slice of each operand - for key, value in operands.items(): - if np.isscalar(value): - chunk_operands[key] = value - continue - if value.shape == (): - chunk_operands[key] = value[()] - continue - if check_smaller_shape(value.shape, shape, chunks_, cslice): - # We need to fetch the part of the value that broadcasts with the operand - smaller_slice = compute_smaller_slice(operand.shape, value.shape, cslice) - chunk_operands[key] = value[smaller_slice] - continue - # If key is in operands, we can reuse the buffer - if ( - key in chunk_operands - and chunks_ == chunk_operands[key].shape - and isinstance(value, blosc2.NDArray) - and unit_steps - ): - value.get_slice_numpy(chunk_operands[key], (starts, stops)) - continue - chunk_operands[key] = value[cslice] + _get_chunk_operands(operands, cslice, chunk_operands, shape) # get local index of part of out that is to be updated cslice_subidx = ndindex.ndindex(cslice).as_subindex(_slice).raw # if _slice is (), just gives cslice - if keepdims: - reduced_slice = tuple(slice(None) if i in axis else sl for i, sl in enumerate(cslice_subidx)) + if reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: + reduced_slice = ( + tuple( + slice(sl.start + 1, sl.stop + 1, sl.step) if i in axis else sl + for i, sl in enumerate(cslice_subidx) + ) + if include_initial + else cslice_subidx + ) else: - reduced_slice = tuple(sl for i, sl in enumerate(cslice_subidx) if i not in axis) + reduced_slice = ( + tuple(slice(None) if i in axis else sl for i, sl in enumerate(cslice_subidx)) + if keepdims + else tuple(sl for i, sl in enumerate(cslice_subidx) if i not in axis) + ) # Evaluate and reduce the expression using chunks of operands if callable(expression): # TODO: Implement the reductions for UDFs (and test them) - result = np.empty(chunks_, dtype=out.dtype) + result = np.empty(cslice_shape, dtype=out.dtype) expression(tuple(chunk_operands.values()), result, offset=offset) # Reduce the result result = reduce_op.value.reduce(result, **reduce_args) @@ -2202,38 +2134,29 @@ def reduce_slices( # noqa: C901 out[reduced_slice] = reduce_op.value(out[reduced_slice], result) continue - if where is None: - if expression == "o0" or expression == "(o0)": - # We don't have an actual expression, so avoid a copy except to make contiguous - result = np.require(chunk_operands["o0"], requirements="C") - else: - result = ne_evaluate(expression, chunk_operands, **ne_args) - else: - # Apply the where condition (in result) - if len(where) == 2: - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, chunk_operands, **ne_args) - elif len(where) == 1: - result = ne_evaluate(expression, chunk_operands, **ne_args) - x = chunk_operands["_where_x"] - result = x[result] - else: - raise NotImplementedError( - "A where condition with no params in combination with reductions is not supported yet" - ) + result, _ = _get_result(expression, chunk_operands, ne_args, where) + # Enforce contiguity of result (necessary to fill the out array) + # but avoid copy if already contiguous + result = np.require(result, requirements="C") # Reduce the result if result.shape == (): if reduce_op == ReduceOp.SUM and result[()] == 0: # Avoid a reduction when result is a zero scalar. Faster for sparse data. continue - # Note that chunks_ refers to slice of operand chunks, not reduced_slice - result = np.full(chunks_, result[()]) + # Note that cslice_shape refers to slice of operand chunks, not reduced_slice + result = np.full(cslice_shape, result[()]) if reduce_op == ReduceOp.ANY: result = np.any(result, **reduce_args) elif reduce_op == ReduceOp.ALL: result = np.all(result, **reduce_args) - elif reduce_op == ReduceOp.ARGMAX or reduce_op == ReduceOp.ARGMIN: + elif reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + result = ( + np.cumulative_sum(result, axis=reduce_args["axis"]) + if reduce_op == ReduceOp.CUMULATIVE_SUM + else np.cumulative_prod(result, axis=reduce_args["axis"]) + ) + elif reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: # offset for start of slice slice_ref = ( starts @@ -2250,7 +2173,7 @@ def reduce_slices( # noqa: C901 ) if reduce_args["axis"] is None: # indexing into flattened array result = result[np.unravel_index(result_idx, shape=result.shape)] - idx_within_cslice = np.unravel_index(result_idx, shape=chunks_) + idx_within_cslice = np.unravel_index(result_idx, shape=cslice_shape) result_idx = np.ravel_multi_index( tuple(o + i for o, i in zip(slice_ref, idx_within_cslice, strict=True)), shape_slice ) @@ -2279,24 +2202,33 @@ def reduce_slices( # noqa: C901 out[reduced_slice] += result elif reduce_op == ReduceOp.ALL: out[reduced_slice] *= result - elif res_out_ is not None: # i.e. ReduceOp.ARGMAX or ReduceOp.ARGMIN + elif res_out_ is not None: # need lowest index for which optimum attained - cond = (res_out_[reduced_slice] == result) & (result_idx < out[reduced_slice]) - if reduce_op == ReduceOp.ARGMAX: - cond |= res_out_[reduced_slice] < result - else: # ARGMIN - cond |= res_out_[reduced_slice] > result - if reduced_slice == (): - out = np.where(cond, result_idx, out[reduced_slice]) - res_out_ = np.where(cond, result, res_out_[reduced_slice]) - else: + if reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: + cond = (res_out_[reduced_slice] == result) & (result_idx < out[reduced_slice]) + cond |= ( + res_out_[reduced_slice] < result + if reduce_op == ReduceOp.ARGMAX + else res_out_[reduced_slice] > result + ) out[reduced_slice] = np.where(cond, result_idx, out[reduced_slice]) res_out_[reduced_slice] = np.where(cond, result, res_out_[reduced_slice]) + else: # CUMULATIVE_SUM or CUMULATIVE_PROD + idx_result = tuple( + slice(-1, None) if i == reduce_args["axis"] else slice(None, None) + for i, c in enumerate(cslice) + ) + idx_lastval = tuple( + slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(cslice) + ) + if reduce_op == ReduceOp.CUMULATIVE_SUM: + result += res_out_[idx_lastval] + else: # CUMULATIVE_PROD + result *= res_out_[idx_lastval] + out[reduced_slice] = result + res_out_[idx_lastval] = result[idx_result] else: - if reduced_slice == (): - out = reduce_op.value(out, result) - else: - out[reduced_slice] = reduce_op.value(out[reduced_slice], result) + out[reduced_slice] = reduce_op.value(out[reduced_slice], result) # No longer need res_out_ del res_out_ @@ -2309,6 +2241,7 @@ def reduce_slices( # noqa: C901 dtype = np.float64 out, _ = convert_none_out(dtype, reduce_op, reduced_shape) + out = out[()] if reduced_shape == () else out # undo dummy dim from inside convert_none_out final_mask = tuple(np.where(mask_slice)[0]) if np.any(mask_slice): # remove dummy dims out = np.squeeze(out, axis=final_mask) @@ -2318,13 +2251,23 @@ def reduce_slices( # noqa: C901 return out -def convert_none_out(dtype, reduce_op, reduced_shape): +def convert_none_out(dtype, reduce_op, reduced_shape, axis=None): out = None + reduced_shape = (1,) if reduced_shape == () else reduced_shape # out will be a proper numpy.ndarray - if reduce_op == ReduceOp.SUM: - out = np.zeros(reduced_shape, dtype=dtype) - elif reduce_op == ReduceOp.PROD: - out = np.ones(reduced_shape, dtype=dtype) + if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_PROD}: + out = ( + np.zeros(reduced_shape, dtype=dtype) + if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM} + else np.ones(reduced_shape, dtype=dtype) + ) + # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None + res_out_ = ( + None + if axis is None + else np.empty(tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)), dtype=dtype) + ) + out = (out, res_out_) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): out = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) @@ -2339,17 +2282,14 @@ def convert_none_out(dtype, reduce_op, reduced_shape): out = np.zeros(reduced_shape, dtype=np.bool_) elif reduce_op == ReduceOp.ALL: out = np.ones(reduced_shape, dtype=np.bool_) - elif reduce_op == ReduceOp.ARGMIN: - if np.issubdtype(dtype, np.integer): - res_out_ = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) - else: - res_out_ = np.inf * np.ones(reduced_shape, dtype=dtype) - out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) - elif reduce_op == ReduceOp.ARGMAX: + elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: + res_out_ = np.ones(reduced_shape, dtype=dtype) if np.issubdtype(dtype, np.integer): - res_out_ = np.iinfo(dtype).min * np.ones(reduced_shape, dtype=dtype) + res_out_ *= np.iinfo(dtype).max if reduce_op == ReduceOp.ARGMIN else np.iinfo(dtype).min + elif np.issubdtype(dtype, np.bool): + res_out_ = res_out_ if reduce_op == ReduceOp.ARGMIN else np.zeros(reduced_shape, dtype=dtype) else: - res_out_ = -np.inf * np.ones(reduced_shape, dtype=dtype) + res_out_ *= np.inf if reduce_op == ReduceOp.ARGMIN else -np.inf out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) return out if isinstance(out, tuple) else (out, None) @@ -3141,6 +3081,34 @@ def argmin( } return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + def cumulative_sum( + self, + axis=None, + include_initial: bool = False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): + reduce_args = { + "op": ReduceOp.CUMULATIVE_SUM, + "axis": axis, + "include_initial": include_initial, + } + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def cumulative_prod( + self, + axis=None, + include_initial: bool = False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): + reduce_args = { + "op": ReduceOp.CUMULATIVE_PROD, + "axis": axis, + "include_initial": include_initial, + } + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + def _eval_constructor(self, expression, constructor, operands): """Evaluate a constructor function inside a string expression.""" diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 13144f4c..cd1882dd 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -538,6 +538,78 @@ def sum( return ndarr.sum(axis=axis, dtype=dtype, keepdims=keepdims, **kwargs) +def cumulative_sum( + ndarr: blosc2.Array, + axis: int | tuple[int] | None = None, + dtype: np.dtype | str = None, + include_initial: bool = False, + **kwargs: Any, +) -> blosc2.Array: + """ + Calculates the cumulative sum of elements in the input array ndarr. + + Parameters + ----------- + ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` + The input array or expression. + axis: int + Axis along which a cumulative sum must be computed. If axis is None, result is equivalent to cumulative sum of flattened array. + dtype: dtype + Data type of the returned array. + include_initial : bool + Boolean indicating whether to include the initial value as the first value in the output. Initial value will be zero. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. + kwargs: dict, optional + Additional keyword arguments supported by the :func:`empty` constructor. + + Returns + ------- + out: blosc2.Array + An array containing the cumulative sums. Let N be the size of the axis along which to compute the cumulative sum. + If include_initial is True, the returned array has the same shape as ndarr, except the size of the axis along which to compute the cumulative sum is N+1. + If include_initial is False, the returned array has the same shape as ndarr. + """ + return ndarr.cumulative_sum(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + +def cumulative_prod( + ndarr: blosc2.Array, + axis: int | tuple[int] | None = None, + dtype: np.dtype | str = None, + include_initial: bool = False, + **kwargs: Any, +) -> blosc2.Array: + """ + Calculates the cumulative product of elements in the input array ndarr. + + Parameters + ----------- + ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` + The input array or expression. + axis: int + Axis along which a cumulative product must be computed. If axis is None, result is equivalent to cumulative product of flattened array. + dtype: dtype + Data type of the returned array. + include_initial : bool + Boolean indicating whether to include the initial value as the first value in the output. Initial value will be one. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. + kwargs: dict, optional + Additional keyword arguments supported by the :func:`empty` constructor. + + Returns + ------- + out: blosc2.Array + An array containing the cumulative products. Let N be the size of the axis along which to compute the cumulative product. + If include_initial is True, the returned array has the same shape as ndarr, except the size of the axis along which to compute the cumulative product is N+1. + If include_initial is False, the returned array has the same shape as ndarr. + """ + return ndarr.cumulative_prod(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + def mean( ndarr: blosc2.Array, axis: int | tuple[int] | None = None, @@ -3371,6 +3443,16 @@ def sum(self, axis=None, dtype=None, keepdims=False, **kwargs): expr = blosc2.LazyExpr(new_op=(self, None, None)) return expr.sum(axis=axis, dtype=dtype, keepdims=keepdims, **kwargs) + @is_documented_by(cumulative_sum) + def cumulative_sum(self, axis=None, dtype=None, include_initial=False, **kwargs): + expr = blosc2.LazyExpr(new_op=(self, None, None)) + return expr.cumulative_sum(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + @is_documented_by(cumulative_prod) + def cumulative_prod(self, axis=None, dtype=None, include_initial=False, **kwargs): + expr = blosc2.LazyExpr(new_op=(self, None, None)) + return expr.cumulative_prod(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + @is_documented_by(mean) def mean(self, axis=None, dtype=None, keepdims=False, **kwargs): expr = blosc2.LazyExpr(new_op=(self, None, None)) diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 2903c8c3..47b65c3d 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -15,6 +15,8 @@ from ndindex.subindex_helpers import ceiling from numpy import broadcast_shapes +import blosc2 + # NumPy version and a convenient boolean flag NUMPY_GE_2_0 = np.__version__ >= "2.0" # handle different numpy versions @@ -781,3 +783,107 @@ def process_key(key, shape): ) # mask to track dummy dims introduced by int -> slice(k, k+1) key = tuple(slice(k, k + 1, None) if isinstance(k, int) else k for k in key) # key is slice, None, int return key, mask + + +def check_smaller_shape(value_shape, shape, slice_shape, slice_): + """Check whether the shape of the value is smaller than the shape of the array. + + This follows the NumPy broadcasting rules. + """ + # slice_shape must be as long as shape + if len(slice_shape) != len(slice_): + raise ValueError("slice_shape must be as long as slice_") + no_nones_shape = tuple(sh for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) + no_nones_slice = tuple(s for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) + is_smaller_shape = any( + s > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_shape) + ) + slice_past_bounds = any( + s.stop > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_slice) + ) + return len(value_shape) < len(shape) or is_smaller_shape or slice_past_bounds + + +def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice): + smaller_slice = [] + diff_dims = len(larger_shape) - len(smaller_shape) + + for i in range(len(larger_shape)): + if i < diff_dims: + # For leading dimensions of the larger array that the smaller array doesn't have, + # we don't add anything to the smaller slice + pass + else: + # For dimensions that both arrays have, the slice for the smaller array should be + # the same as the larger array unless the smaller array's size along that dimension + # is 1, in which case we use None to indicate the full slice + if smaller_shape[i - diff_dims] != 1: + smaller_slice.append(larger_slice[i]) + else: + smaller_slice.append(slice(0, larger_shape[i])) + + return tuple(smaller_slice) + + +# A more compact version of the function above, albeit less readable +def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): + """ + Returns the slice of the smaller array that corresponds to the slice of the larger array. + """ + j_small = len(smaller_shape) - 1 + j_large = len(larger_shape) - 1 + smaller_shape_nones = [] + larger_shape_nones = [] + for s in reversed(larger_slice): + if s is None: + smaller_shape_nones.append(1) + larger_shape_nones.append(1) + else: + if j_small >= 0: + smaller_shape_nones.append(smaller_shape[j_small]) + j_small -= 1 + if j_large >= 0: + larger_shape_nones.append(larger_shape[j_large]) + j_large -= 1 + smaller_shape_nones.reverse() + larger_shape_nones.reverse() + diff_dims = len(larger_shape_nones) - len(smaller_shape_nones) + return tuple( + None + if larger_slice[i] is None + else ( + larger_slice[i] if smaller_shape_nones[i - diff_dims] != 1 else slice(0, larger_shape_nones[i]) + ) + for i in range(diff_dims, len(larger_shape_nones)) + ) + + +def _get_chunk_operands(operands, cslice, chunk_operands, shape): + # Get the starts and stops for the slice + cslice_shape = tuple(s.stop - s.start for s in cslice) + starts = [s.start if s.start is not None else 0 for s in cslice] + stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, cslice_shape, strict=True)] + unit_steps = np.all([s.step == 1 for s in cslice]) + # Get the slice of each operand + for key, value in operands.items(): + if np.isscalar(value): + chunk_operands[key] = value + continue + if value.shape == (): + chunk_operands[key] = value[()] + continue + if check_smaller_shape(value.shape, shape, cslice_shape, cslice): + # We need to fetch the part of the value that broadcasts with the operand + smaller_slice = compute_smaller_slice(shape, value.shape, cslice) + chunk_operands[key] = value[smaller_slice] + continue + # If key is in operands, we can reuse the buffer + if ( + key in chunk_operands + and cslice_shape == chunk_operands[key].shape + and isinstance(value, blosc2.NDArray) + and unit_steps + ): + value.get_slice_numpy(chunk_operands[key], (starts, stops)) + continue + chunk_operands[key] = value[cslice] From 7f014d1bcfccd4da0447c7b901e60cd247ce7785 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Wed, 4 Feb 2026 10:34:23 +0100 Subject: [PATCH 2/7] Added tests, still need to handle broadcasting errors and some fp problems --- src/blosc2/lazyexpr.py | 110 +++++------- src/blosc2/ndarray.py | 4 +- src/blosc2/utils.py | 4 + tests/ndarray/test_reductions.py | 286 +++++++++++++++++++++++++------ 4 files changed, 287 insertions(+), 117 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index b1ef22a9..a603ab7b 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -13,6 +13,7 @@ import builtins import concurrent.futures import copy +import enum import inspect import linecache import math @@ -59,6 +60,8 @@ infer_shape, linalg_attrs, linalg_funcs, + npcumprod, + npcumsum, npvecdot, process_key, reducers, @@ -93,6 +96,8 @@ safe_numpy_globals["concat"] = np.concatenate safe_numpy_globals["matrix_transpose"] = np.transpose safe_numpy_globals["vecdot"] = npvecdot + safe_numpy_globals["cumulative_sum"] = npcumsum + safe_numpy_globals["cumulative_prod"] = npcumprod # Set this to False if miniexpr should not be tried out try_miniexpr = True @@ -271,25 +276,27 @@ class ReduceOp(Enum): Available reduce operations. """ - SUM = np.add - PROD = np.multiply - MEAN = np.mean - STD = np.std - VAR = np.var + # wrap as enum.member so that Python doesn't treat some funcs + # as class methods (rather than Enum members) + SUM = enum.member(np.add) + PROD = enum.member(np.multiply) + MEAN = enum.member(np.mean) + STD = enum.member(np.std) + VAR = enum.member(np.var) # Computing a median from partial results is not straightforward because the median # is a positional statistic, which means it depends on the relative ordering of all # the data points. Unlike statistics such as the sum or mean, you can't compute a median # from partial results without knowing the entire dataset, and this is way too expensive # for arrays that cannot typically fit in-memory (e.g. disk-based NDArray). # MEDIAN = np.median - MAX = np.maximum - MIN = np.minimum - ANY = np.any - ALL = np.all - ARGMAX = np.argmax - ARGMIN = np.argmin - CUMULATIVE_SUM = np.cumulative_sum - CUMULATIVE_PROD = np.cumulative_prod + MAX = enum.member(np.maximum) + MIN = enum.member(np.minimum) + ANY = enum.member(np.any) + ALL = enum.member(np.all) + ARGMAX = enum.member(np.argmax) + ARGMIN = enum.member(np.argmin) + CUMULATIVE_SUM = enum.member(npcumsum) + CUMULATIVE_PROD = enum.member(npcumprod) class LazyArrayEnum(Enum): @@ -1887,9 +1894,12 @@ def reduce_slices( # noqa: C901 reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: reduced_shape = (np.prod(shape_slice),) if reduce_args["axis"] is None else shape_slice - temp_axis = 0 if reduce_args["axis"] is None else reduce_args["axis"] + # if reduce_args["axis"] is None, have to have 1D input array + reduce_args["axis"] = 0 if reduce_args["axis"] is None else reduce_args["axis"] if include_initial: - reduced_shape = tuple(s + 1 if i == temp_axis else s for i, s in enumerate(shape_slice)) + reduced_shape = tuple( + s + 1 if i == reduce_args["axis"] else s for i, s in enumerate(shape_slice) + ) else: if keepdims: reduced_shape = tuple(1 if i in axis else s for i, s in enumerate(shape_slice)) @@ -1997,7 +2007,7 @@ def reduce_slices( # noqa: C901 # Padding blocks won't be written, so initial values matter for the final reduction if reduce_op in {ReduceOp.SUM, ReduceOp.ANY, ReduceOp.CUMULATIVE_SUM}: aux_reduc = np.zeros(nblocks, dtype=dtype) - elif reduce_op in {ReduceOp.PROD, ReduceOp.ALL, ReduceOp.CUMULATIVE_SUM}: + elif reduce_op in {ReduceOp.PROD, ReduceOp.ALL, ReduceOp.CUMULATIVE_PROD}: aux_reduc = np.ones(nblocks, dtype=dtype) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): @@ -2038,10 +2048,8 @@ def reduce_slices( # noqa: C901 # (continue to the manual chunked evaluation below) pass else: - if reduce_op == ReduceOp.ANY: - result = np.any(aux_reduc, **reduce_args) - elif reduce_op == ReduceOp.ALL: - result = np.all(aux_reduc, **reduce_args) + if reduce_op in {ReduceOp.ANY, ReduceOp.ALL}: + result = reduce_op.value(aux_reduc, **reduce_args) else: result = reduce_op.value.reduce(aux_reduc, **reduce_args) return result @@ -2054,30 +2062,6 @@ def reduce_slices( # noqa: C901 out_init = False for nchunk, chunk_slice in enumerate(intersecting_chunks): - # Special case for cumulative operations with axis = None - if reduce_args["axis"] is None and reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: - # res_out_ is just None, out set to all 0s (sum) or 1s (prod) - out, res_out_ = convert_none_out(dtype, reduce_op, reduced_shape) - # reduced_shape is just one-element tuple - chunklen = out.chunks[0] if hasattr(out, "chunks") else chunks[-1] - carry = 0 - for cidx in range(0, reduced_shape[0] // chunklen): - slice_starts = np.unravel_index(cidx * chunklen, shape) - slice_stops = np.unravel_index((cidx + 1) * chunklen, shape) - cslice = tuple( - slice(start, stop) for start, stop in zip(slice_starts, slice_stops, strict=True) - ) - _get_chunk_operands(operands, cslice, chunk_operands, shape) - result, _ = _get_result(expression, chunk_operands, ne_args, where) - result = np.require(result, requirements="C") - if reduce_op == ReduceOp.CUMULATIVE_SUM: - res = np.cumulative_sum(result, axis=None) + carry - else: - res = np.cumulative_prod(result, axis=None) * carry - carry = res[-1] - out[cidx * chunklen + include_initial : (cidx + 1) * chunklen + include_initial] = res - break # skip looping over chunks - cslice = chunk_slice.raw # Check whether current cslice intersects with _slice if cslice != () and _slice != (): @@ -2146,16 +2130,8 @@ def reduce_slices( # noqa: C901 continue # Note that cslice_shape refers to slice of operand chunks, not reduced_slice result = np.full(cslice_shape, result[()]) - if reduce_op == ReduceOp.ANY: - result = np.any(result, **reduce_args) - elif reduce_op == ReduceOp.ALL: - result = np.all(result, **reduce_args) - elif reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: - result = ( - np.cumulative_sum(result, axis=reduce_args["axis"]) - if reduce_op == ReduceOp.CUMULATIVE_SUM - else np.cumulative_prod(result, axis=reduce_args["axis"]) - ) + if reduce_op in {ReduceOp.ANY, ReduceOp.ALL, ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + result = reduce_op.value(result, **reduce_args) elif reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: # offset for start of slice slice_ref = ( @@ -2166,11 +2142,7 @@ def reduce_slices( # noqa: C901 for s, sl in zip(starts, _slice, strict=True) ] ) - result_idx = ( - np.argmin(result, **reduce_args) - if reduce_op == ReduceOp.ARGMIN - else np.argmax(result, **reduce_args) - ) + result_idx = reduce_op.value(result, **reduce_args) if reduce_args["axis"] is None: # indexing into flattened array result = result[np.unravel_index(result_idx, shape=result.shape)] idx_within_cslice = np.unravel_index(result_idx, shape=cslice_shape) @@ -2189,7 +2161,7 @@ def reduce_slices( # noqa: C901 result = reduce_op.value.reduce(result, **reduce_args) if not out_init: - out_, res_out_ = convert_none_out(result.dtype, reduce_op, reduced_shape) + out_, res_out_ = convert_none_out(result.dtype, reduce_op, reduced_shape, axis=axis) if out is not None: out[:] = out_ del out_ @@ -2262,11 +2234,15 @@ def convert_none_out(dtype, reduce_op, reduced_shape, axis=None): else np.ones(reduced_shape, dtype=dtype) ) # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None - res_out_ = ( - None - if axis is None - else np.empty(tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)), dtype=dtype) - ) + if reduce_op in {ReduceOp.SUM, ReduceOp.PROD}: + res_out_ = None + else: + temp_shape = tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)) + res_out_ = ( + np.zeros(temp_shape, dtype=dtype) + if reduce_op == ReduceOp.CUMULATIVE_SUM + else np.ones(temp_shape, dtype=dtype) + ) out = (out, res_out_) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): @@ -3093,6 +3069,8 @@ def cumulative_sum( "axis": axis, "include_initial": include_initial, } + if self.ndim != 1 and axis is None: + raise ValueError("axis must be specified for cumulative_sum of non-1D array.") return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) def cumulative_prod( @@ -3107,6 +3085,8 @@ def cumulative_prod( "axis": axis, "include_initial": include_initial, } + if self.ndim != 1 and axis is None: + raise ValueError("axis must be specified for cumulative_prod of non-1D array.") return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) def _eval_constructor(self, expression, constructor, operands): diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index cd1882dd..121d6a73 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -553,7 +553,7 @@ def cumulative_sum( ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` The input array or expression. axis: int - Axis along which a cumulative sum must be computed. If axis is None, result is equivalent to cumulative sum of flattened array. + Axis along which a cumulative sum must be computed. If array is 1D, axis may be None; otherwise the axis must be specified. dtype: dtype Data type of the returned array. include_initial : bool @@ -589,7 +589,7 @@ def cumulative_prod( ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` The input array or expression. axis: int - Axis along which a cumulative product must be computed. If axis is None, result is equivalent to cumulative product of flattened array. + Axis along which a cumulative product must be computed. If array is 1D, axis may be None; otherwise the axis must be specified. dtype: dtype Data type of the returned array. include_initial : bool diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 47b65c3d..125b4cd6 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -26,11 +26,15 @@ npbinvert = np.bitwise_invert npvecdot = np.vecdot nptranspose = np.permute_dims + npcumsum = np.cumulative_sum + npcumprod = np.cumulative_prod else: # not array-api compliant nplshift = np.left_shift nprshift = np.right_shift npbinvert = np.bitwise_not nptranspose = np.transpose + npcumsum = np.cumsum + npcumprod = np.cumprod def npvecdot(a, b, axis=-1): return np.einsum("...i,...i->...", np.moveaxis(np.conj(a), axis, -1), np.moveaxis(b, axis, -1)) diff --git a/tests/ndarray/test_reductions.py b/tests/ndarray/test_reductions.py index a39ea52d..2d51a1ff 100644 --- a/tests/ndarray/test_reductions.py +++ b/tests/ndarray/test_reductions.py @@ -52,24 +52,33 @@ def array_fixture(dtype_fixture, shape_fixture): # @pytest.mark.parametrize("reduce_op", ["sum"]) -@pytest.mark.parametrize("reduce_op", ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin"]) +@pytest.mark.parametrize( + "reduce_op", + ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin", "cumulative_sum", "cumulative_prod"], +) def test_reduce_bool(array_fixture, reduce_op): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture expr = (a1 + a2) > (a3 * a4) nres = ne_evaluate("(na1 + na2) > (na3 * na4)") - res = getattr(expr, reduce_op)() - # res = expr.sum() - # print("res:", res) - nres = getattr(nres, reduce_op)() + axis = None + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 + nres = eval(f"np.{reduce_op}(nres, axis={axis})") + else: + nres = getattr(nres, reduce_op)(axis=axis) + res = getattr(expr, reduce_op)(axis=axis) tol = 1e-15 if a1.dtype == "float64" else 1e-6 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # @pytest.mark.parametrize("reduce_op", ["sum"]) -@pytest.mark.parametrize("reduce_op", ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin"]) +@pytest.mark.parametrize( + "reduce_op", + ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin", "cumulative_sum", "cumulative_prod"], +) def test_reduce_where(array_fixture, reduce_op): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture - if reduce_op == "prod": + if reduce_op in {"prod", "cumulative_prod"}: # To avoid overflow, create a1 and a2 with small values na1 = np.linspace(0, 0.1, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) a1 = blosc2.asarray(na1) @@ -80,8 +89,13 @@ def test_reduce_where(array_fixture, reduce_op): else: expr = blosc2.where(a1 < a2, a2, a1) nres = eval("np.where(na1 < na2, na2, na1)") - res = getattr(expr, reduce_op)() - nres = getattr(nres, reduce_op)() + axis = None + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 + nres = eval(f"np.{reduce_op}(nres, axis={axis})") + else: + nres = getattr(nres, reduce_op)(axis=axis) + res = getattr(expr, reduce_op)(axis=axis) # print("res:", res, nres, type(res), type(nres)) tol = 1e-15 if a1.dtype == "float64" else 1e-6 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) @@ -105,7 +119,22 @@ def test_fp_accuracy(accuracy, dtype): @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "mean", "std", "var", "min", "max", "any", "all", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "mean", + "std", + "var", + "min", + "max", + "any", + "all", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [1, (0, 1), None]) @pytest.mark.parametrize("keepdims", [True, False]) @@ -117,11 +146,21 @@ def test_fp_accuracy(accuracy, dtype): @pytest.mark.heavy def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwargs): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture + reduce_args = {"axis": axis} + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + reduce_args["include_initial"] = keepdims + else: + reduce_args["keepdims"] = keepdims + if reduce_op in ("mean", "std") and dtype_out == np.int16: + # mean and std need float dtype as output + dtype_out = np.float64 + if reduce_op in ("sum", "prod", "mean", "std"): + reduce_args["dtype"] = dtype_out if axis is not None and np.isscalar(axis) and len(a1.shape) >= axis: return if isinstance(axis, tuple) and (len(a1.shape) < len(axis) or reduce_op in ("argmax", "argmin")): return - if reduce_op == "prod": + if reduce_op in {"prod", "cumulative_prod"}: # To avoid overflow, create a1 and a2 with small values na1 = np.linspace(0, 0.1, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) a1 = blosc2.asarray(na1) @@ -132,15 +171,9 @@ def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwar else: expr = a1 + a2 - a3 * a4 nres = eval("na1 + na2 - na3 * na4") - if reduce_op in ("sum", "prod", "mean", "std"): - if reduce_op in ("mean", "std") and dtype_out == np.int16: - # mean and std need float dtype as output - dtype_out = np.float64 - res = getattr(expr, reduce_op)(axis=axis, keepdims=keepdims, dtype=dtype_out, **kwargs) - nres = getattr(nres, reduce_op)(axis=axis, keepdims=keepdims, dtype=dtype_out) - else: - res = getattr(expr, reduce_op)(axis=axis, keepdims=keepdims, **kwargs) - nres = getattr(nres, reduce_op)(axis=axis, keepdims=keepdims) + + res = getattr(expr, reduce_op)(**reduce_args, **kwargs) + nres = getattr(nres, reduce_op)(**reduce_args) tol = 1e-15 if a1.dtype == "float64" else 1e-6 if kwargs != {}: if not np.isscalar(res): @@ -152,7 +185,8 @@ def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwar # TODO: "prod" is not supported here because it overflows with current values @pytest.mark.parametrize( - "reduce_op", ["sum", "min", "max", "mean", "std", "var", "any", "all", "argmax", "argmin"] + "reduce_op", + ["sum", "min", "max", "mean", "std", "var", "any", "all", "argmax", "argmin", "cumulative_sum"], ) @pytest.mark.parametrize("axis", [0, 1, None]) def test_reduce_expr_arr(array_fixture, axis, reduce_op): @@ -161,15 +195,33 @@ def test_reduce_expr_arr(array_fixture, axis, reduce_op): return expr = a1 + a2 - a3 * a4 nres = eval("na1 + na2 - na3 * na4") + if reduce_op == "cumulative_sum": + axis = 0 if axis is None else axis + nres = nres.cumsum(axis=axis) + na1.cumsum(axis=axis) + else: + nres = getattr(nres, reduce_op)(axis=axis) + getattr(na1, reduce_op)(axis=axis) res = getattr(expr, reduce_op)(axis=axis) + getattr(a1, reduce_op)(axis=axis) - nres = getattr(nres, reduce_op)(axis=axis) + getattr(na1, reduce_op)(axis=axis) tol = 1e-15 if a1.dtype == "float64" else 1e-6 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test broadcasting @pytest.mark.parametrize( - "reduce_op", ["sum", "mean", "std", "var", "min", "max", "any", "all", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "mean", + "std", + "var", + "min", + "max", + "any", + "all", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) @pytest.mark.parametrize("keepdims", [True, False]) @@ -182,8 +234,14 @@ def test_reduce_expr_arr(array_fixture, axis, reduce_op): ], ) def test_broadcast_params(axis, keepdims, reduce_op, shapes): - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis + reduce_args = {"axis": axis} + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + reduce_args["include_initial"] = keepdims + else: + reduce_args["keepdims"] = keepdims na1 = np.linspace(0, 1, np.prod(shapes[0])).reshape(shapes[0]) na2 = np.linspace(1, 2, np.prod(shapes[1])).reshape(shapes[1]) na3 = np.linspace(2, 3, np.prod(shapes[2])).reshape(shapes[2]) @@ -194,10 +252,14 @@ def test_broadcast_params(axis, keepdims, reduce_op, shapes): assert expr1.shape == shapes[0] expr2 = a1 * a2 + 1 assert expr2.shape == shapes[0] - res = expr1 - getattr(expr2, reduce_op)(axis=axis, keepdims=keepdims) + res = expr1 - getattr(expr2, reduce_op)(**reduce_args) assert res.shape == shapes[0] # print(f"res: {res.shape} expr1: {expr1.shape} expr2: {expr2.shape}") - nres = eval(f"na1 + na2 - na3 - (na1 * na2 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})") + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + expr = f"na1 + na2 - na3 - np.{reduce_op}(na1 * na2 + 1, axis={axis}, include_initial={keepdims})" + else: + expr = f"na1 + na2 - na3 - (na1 * na2 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})" + nres = eval(expr) tol = 1e-14 if a1.dtype == "float64" else 1e-5 np.testing.assert_allclose(res[:], nres, atol=tol, rtol=tol) @@ -205,7 +267,22 @@ def test_broadcast_params(axis, keepdims, reduce_op, shapes): # Test reductions with item parameter @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) @pytest.mark.parametrize("stripes", ["rows", "columns"]) @@ -223,7 +300,7 @@ def test_reduce_item(reduce_op, dtype, stripes, stripe_len, shape, chunks): else: _slice = (slice(None), slice(i, i + stripe_len)) slice_ = na[_slice] - if slice_.size == 0 and reduce_op not in ("sum", "prod"): + if slice_.size == 0 and reduce_op not in ("sum", "prod", "cumulative_sum", "cumulative_prod"): # For mean, std, and var, numpy just raises a warning, so don't check if reduce_op in ("min", "max", "argmin", "argmax"): # Check that a ValueError is raised when the slice is empty @@ -238,7 +315,22 @@ def test_reduce_item(reduce_op, dtype, stripes, stripe_len, shape, chunks): @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) def test_reduce_slice(reduce_op): shape = (8, 12, 5) @@ -282,7 +374,22 @@ def test_reduce_slice(reduce_op): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, 1, None]) def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): @@ -295,23 +402,41 @@ def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): if disk: a = blosc2.open(urlpath) na = a[:] - + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 if axis is None else axis + nres = eval(f"np.{reduce_op}(na, axis={axis})") + else: + nres = getattr(na, reduce_op)(axis=axis) res = getattr(a, reduce_op)(axis=axis) - nres = getattr(na, reduce_op)(axis=axis) - assert np.allclose(res, nres) @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version1(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -334,7 +459,10 @@ def test_save_version1(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = na + getattr(nb[()], reduce_op)(axis=axis) + 1 + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + nres = na + eval(f"np.{reduce_op}(nb, axis={axis})") + 1 + else: + nres = na + getattr(nb, reduce_op)(axis=axis) + 1 assert np.allclose(res[()], nres) if disk: @@ -346,13 +474,29 @@ def test_save_version1(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version2(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -374,7 +518,10 @@ def test_save_version2(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + nb + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + nres = eval(f"np.{reduce_op}(na, axis={axis})") + nb + else: + nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) if disk: @@ -386,13 +533,29 @@ def test_save_version2(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version3(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -414,7 +577,10 @@ def test_save_version3(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + nb + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + nres = eval(f"np.{reduce_op}(na, axis={axis})") + nb + else: + nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) if disk: @@ -426,12 +592,29 @@ def test_save_version3(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version4(disk, fill_value, reduce_op, axis): - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis + shape = (20, 20, 100) shape = (20, 50, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -452,7 +635,10 @@ def test_save_version4(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + nres = eval(f"np.{reduce_op}(na, axis={axis})") + else: + nres = getattr(na, reduce_op)(axis=axis) assert np.allclose(res[()], nres) if disk: From 8302b6fd8494bc997a020af37f01d94112178be8 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Wed, 4 Feb 2026 10:43:38 +0100 Subject: [PATCH 3/7] Handle cumulative_sum/cumsum in numpy <2.1 --- src/blosc2/utils.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 125b4cd6..1edf7d27 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -26,8 +26,12 @@ npbinvert = np.bitwise_invert npvecdot = np.vecdot nptranspose = np.permute_dims - npcumsum = np.cumulative_sum - npcumprod = np.cumulative_prod + if np.__version__ >= "2.1": + npcumsum = np.cumulative_sum + npcumprod = np.cumulative_prod + else: + npcumsum = np.cumsum + npcumprod = np.cumprod else: # not array-api compliant nplshift = np.left_shift nprshift = np.right_shift From 6bf22afae7c89a7ffcaec96128dd3e61302ffd68 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Wed, 4 Feb 2026 12:36:11 +0100 Subject: [PATCH 4/7] Almost all tests pass --- src/blosc2/lazyexpr.py | 23 +++++++++----- tests/ndarray/test_reductions.py | 54 ++++++++++++++++++++------------ 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index a603ab7b..cca0f391 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -1860,7 +1860,7 @@ def reduce_slices( # noqa: C901 reduce_op_str = reduce_args.pop("op_str", None) axis = reduce_args["axis"] keepdims = reduce_args.get("keepdims", False) - include_initial = reduce_args.get("include_initial", False) + include_initial = reduce_args.pop("include_initial", False) dtype = reduce_args.get("dtype", None) if dtype is None: dtype = kwargs.pop("dtype", None) @@ -1894,8 +1894,8 @@ def reduce_slices( # noqa: C901 reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: reduced_shape = (np.prod(shape_slice),) if reduce_args["axis"] is None else shape_slice - # if reduce_args["axis"] is None, have to have 1D input array - reduce_args["axis"] = 0 if reduce_args["axis"] is None else reduce_args["axis"] + # if reduce_args["axis"] is None, have to have 1D input array; otherwise, ensure positive scalar + reduce_args["axis"] = 0 if reduce_args["axis"] is None else axis[0] if include_initial: reduced_shape = tuple( s + 1 if i == reduce_args["axis"] else s for i, s in enumerate(shape_slice) @@ -2093,7 +2093,7 @@ def reduce_slices( # noqa: C901 if reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: reduced_slice = ( tuple( - slice(sl.start + 1, sl.stop + 1, sl.step) if i in axis else sl + slice(sl.start + 1, sl.stop + 1, sl.step) if i == reduce_args["axis"] else sl for i, sl in enumerate(cslice_subidx) ) if include_initial @@ -2161,7 +2161,11 @@ def reduce_slices( # noqa: C901 result = reduce_op.value.reduce(result, **reduce_args) if not out_init: - out_, res_out_ = convert_none_out(result.dtype, reduce_op, reduced_shape, axis=axis) + out_, res_out_ = convert_none_out( + result.dtype, reduce_op, reduced_shape, axis=reduce_args["axis"] + ) + # if reduce_op == ReduceOp.CUMULATIVE_SUM: + # kahan_sum = np.zeros_like(res_out_) if out is not None: out[:] = out_ del out_ @@ -2188,12 +2192,17 @@ def reduce_slices( # noqa: C901 else: # CUMULATIVE_SUM or CUMULATIVE_PROD idx_result = tuple( slice(-1, None) if i == reduce_args["axis"] else slice(None, None) - for i, c in enumerate(cslice) + for i, c in enumerate(reduced_slice) ) idx_lastval = tuple( - slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(cslice) + slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(reduced_slice) ) if reduce_op == ReduceOp.CUMULATIVE_SUM: + # use Kahan summation algorithm for better precision + # y = res_out_[idx_lastval] - kahan_sum[idx_lastval] + # t = result + y + # kahan_sum[idx_lastval] = ((t - result) - y)[idx_result] + # result = t result += res_out_[idx_lastval] else: # CUMULATIVE_PROD result *= res_out_[idx_lastval] diff --git a/tests/ndarray/test_reductions.py b/tests/ndarray/test_reductions.py index 2d51a1ff..1ab9d41f 100644 --- a/tests/ndarray/test_reductions.py +++ b/tests/ndarray/test_reductions.py @@ -11,7 +11,7 @@ import pytest import blosc2 -from blosc2.lazyexpr import ne_evaluate +from blosc2.lazyexpr import ne_evaluate, npcumsum NITEMS_SMALL = 1000 NITEMS = 10_000 @@ -63,7 +63,8 @@ def test_reduce_bool(array_fixture, reduce_op): axis = None if reduce_op in {"cumulative_sum", "cumulative_prod"}: axis = 0 - nres = eval(f"np.{reduce_op}(nres, axis={axis})") + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(nres, axis={axis})") else: nres = getattr(nres, reduce_op)(axis=axis) res = getattr(expr, reduce_op)(axis=axis) @@ -92,7 +93,8 @@ def test_reduce_where(array_fixture, reduce_op): axis = None if reduce_op in {"cumulative_sum", "cumulative_prod"}: axis = 0 - nres = eval(f"np.{reduce_op}(nres, axis={axis})") + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(nres, axis={axis})") else: nres = getattr(nres, reduce_op)(axis=axis) res = getattr(expr, reduce_op)(axis=axis) @@ -197,7 +199,7 @@ def test_reduce_expr_arr(array_fixture, axis, reduce_op): nres = eval("na1 + na2 - na3 * na4") if reduce_op == "cumulative_sum": axis = 0 if axis is None else axis - nres = nres.cumsum(axis=axis) + na1.cumsum(axis=axis) + nres = npcumsum(nres, axis=axis) + npcumsum(na1, axis=axis) else: nres = getattr(nres, reduce_op)(axis=axis) + getattr(na1, reduce_op)(axis=axis) res = getattr(expr, reduce_op)(axis=axis) + getattr(a1, reduce_op)(axis=axis) @@ -236,7 +238,7 @@ def test_reduce_expr_arr(array_fixture, axis, reduce_op): def test_broadcast_params(axis, keepdims, reduce_op, shapes): if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): axis = 1 if isinstance(axis, tuple) else axis - axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis + axis = 0 if reduce_op[:3] == "cum" else axis reduce_args = {"axis": axis} if reduce_op in {"cumulative_sum", "cumulative_prod"}: reduce_args["include_initial"] = keepdims @@ -250,15 +252,16 @@ def test_broadcast_params(axis, keepdims, reduce_op, shapes): a3 = blosc2.asarray(na3) expr1 = a1 + a2 - a3 assert expr1.shape == shapes[0] - expr2 = a1 * a2 + 1 - assert expr2.shape == shapes[0] - res = expr1 - getattr(expr2, reduce_op)(**reduce_args) - assert res.shape == shapes[0] + expr2 = a2 * a3 + 1 + assert expr2.shape == shapes[1] # print(f"res: {res.shape} expr1: {expr1.shape} expr2: {expr2.shape}") if reduce_op in {"cumulative_sum", "cumulative_prod"}: - expr = f"na1 + na2 - na3 - np.{reduce_op}(na1 * na2 + 1, axis={axis}, include_initial={keepdims})" + res = expr2 - getattr(expr1, reduce_op)(**reduce_args) + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + expr = f"na2 * na3 + 1 - {oploc}(na1 + na2 - na3, axis={axis}, include_initial={keepdims})" else: - expr = f"na1 + na2 - na3 - (na1 * na2 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})" + res = expr1 - getattr(expr2, reduce_op)(**reduce_args) + expr = f"na1 + na2 - na3 - (na2 * na3 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})" nres = eval(expr) tol = 1e-14 if a1.dtype == "float64" else 1e-5 @@ -339,24 +342,30 @@ def test_reduce_slice(reduce_op): tol = 1e-6 if na.dtype == np.float32 else 1e-15 _slice = (slice(1, 2, 1), slice(3, 7, 1)) res = getattr(a, reduce_op)(item=_slice, axis=-1) - nres = getattr(na[_slice], reduce_op)(axis=-1) + if reduce_op == "cumulative_sum": + oploc = "npcumsum" + elif reduce_op == "cumulative_prod": + oploc = "npcumprod" + else: + oploc = f"np.{reduce_op}" + nres = eval(f"{oploc}(na[_slice], axis=-1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test reductions with slices and strides _slice = (slice(1, 2, 1), slice(1, 9, 2)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test reductions with ints _slice = (0, slice(1, 9, 1)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) _slice = (0, slice(1, 9, 2)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) @@ -404,7 +413,8 @@ def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): na = a[:] if reduce_op in {"cumulative_sum", "cumulative_prod"}: axis = 0 if axis is None else axis - nres = eval(f"np.{reduce_op}(na, axis={axis})") + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") else: nres = getattr(na, reduce_op)(axis=axis) res = getattr(a, reduce_op)(axis=axis) @@ -460,7 +470,8 @@ def test_save_version1(disk, fill_value, reduce_op, axis): lexpr = blosc2.open("out.b2nd") res = lexpr.compute() if reduce_op in {"cumulative_sum", "cumulative_prod"}: - nres = na + eval(f"np.{reduce_op}(nb, axis={axis})") + 1 + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = na + eval(f"{oploc}(nb, axis={axis})") + 1 else: nres = na + getattr(nb, reduce_op)(axis=axis) + 1 assert np.allclose(res[()], nres) @@ -519,7 +530,8 @@ def test_save_version2(disk, fill_value, reduce_op, axis): lexpr = blosc2.open("out.b2nd") res = lexpr.compute() if reduce_op in {"cumulative_sum", "cumulative_prod"}: - nres = eval(f"np.{reduce_op}(na, axis={axis})") + nb + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + nb else: nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) @@ -578,7 +590,8 @@ def test_save_version3(disk, fill_value, reduce_op, axis): lexpr = blosc2.open("out.b2nd") res = lexpr.compute() if reduce_op in {"cumulative_sum", "cumulative_prod"}: - nres = eval(f"np.{reduce_op}(na, axis={axis})") + nb + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + nb else: nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) @@ -636,7 +649,8 @@ def test_save_version4(disk, fill_value, reduce_op, axis): lexpr = blosc2.open("out.b2nd") res = lexpr.compute() if reduce_op in {"cumulative_sum", "cumulative_prod"}: - nres = eval(f"np.{reduce_op}(na, axis={axis})") + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") else: nres = getattr(na, reduce_op)(axis=axis) assert np.allclose(res[()], nres) From de21fb57805cc45d74322bc5d273dfeafb2aaf37 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Thu, 5 Feb 2026 09:21:04 +0100 Subject: [PATCH 5/7] Add loop over reduction axis --- src/blosc2/lazyexpr.py | 91 +++++++++++++++++++++++------------------- src/blosc2/utils.py | 68 +++++++++++++++++++++++++++---- 2 files changed, 111 insertions(+), 48 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index cca0f391..06ff5cd1 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -2058,8 +2058,14 @@ def reduce_slices( # noqa: C901 chunk_operands = {} # Check which chunks intersect with _slice # if chunks has 0 we loop once but fast path is false as gives error (schunk has no chunks) - intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) + if ( + np.isscalar(reduce_args["axis"]) and not fast_path + ): # iterate over chunks incrementing along reduction axis + intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks, axis=reduce_args["axis"]) + else: # iterate over chunks incrementing along last axis + intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) out_init = False + res_out_init = False for nchunk, chunk_slice in enumerate(intersecting_chunks): cslice = chunk_slice.raw @@ -2161,9 +2167,7 @@ def reduce_slices( # noqa: C901 result = reduce_op.value.reduce(result, **reduce_args) if not out_init: - out_, res_out_ = convert_none_out( - result.dtype, reduce_op, reduced_shape, axis=reduce_args["axis"] - ) + out_ = convert_none_out(result.dtype, reduce_op, reduced_shape) # if reduce_op == ReduceOp.CUMULATIVE_SUM: # kahan_sum = np.zeros_like(res_out_) if out is not None: @@ -2173,6 +2177,12 @@ def reduce_slices( # noqa: C901 out = out_ out_init = True + if (reduce_args["axis"] is None and not res_out_init) or ( + np.isscalar(reduce_args["axis"]) and cslice_subidx[reduce_args["axis"]].start == 0 + ): # starting reduction again along axis + res_out_ = _get_res_out(result.shape, reduce_args["axis"], dtype, reduce_op) + res_out_init = True + # Update the output array with the result if reduce_op == ReduceOp.ANY: out[reduced_slice] += result @@ -2181,33 +2191,29 @@ def reduce_slices( # noqa: C901 elif res_out_ is not None: # need lowest index for which optimum attained if reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: - cond = (res_out_[reduced_slice] == result) & (result_idx < out[reduced_slice]) - cond |= ( - res_out_[reduced_slice] < result - if reduce_op == ReduceOp.ARGMAX - else res_out_[reduced_slice] > result - ) + cond = (res_out_ == result) & (result_idx < out[reduced_slice]) + cond |= res_out_ < result if reduce_op == ReduceOp.ARGMAX else res_out_ > result out[reduced_slice] = np.where(cond, result_idx, out[reduced_slice]) - res_out_[reduced_slice] = np.where(cond, result, res_out_[reduced_slice]) + res_out_ = np.where(cond, result, res_out_) else: # CUMULATIVE_SUM or CUMULATIVE_PROD idx_result = tuple( slice(-1, None) if i == reduce_args["axis"] else slice(None, None) for i, c in enumerate(reduced_slice) ) - idx_lastval = tuple( - slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(reduced_slice) - ) + # idx_lastval = tuple( + # slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(reduced_slice) + # ) if reduce_op == ReduceOp.CUMULATIVE_SUM: # use Kahan summation algorithm for better precision # y = res_out_[idx_lastval] - kahan_sum[idx_lastval] # t = result + y # kahan_sum[idx_lastval] = ((t - result) - y)[idx_result] # result = t - result += res_out_[idx_lastval] + result += res_out_ else: # CUMULATIVE_PROD - result *= res_out_[idx_lastval] + result *= res_out_ out[reduced_slice] = result - res_out_[idx_lastval] = result[idx_result] + res_out_ = result[idx_result] else: out[reduced_slice] = reduce_op.value(out[reduced_slice], result) @@ -2220,7 +2226,7 @@ def reduce_slices( # noqa: C901 if dtype is None: # We have no hint here, so choose a default dtype dtype = np.float64 - out, _ = convert_none_out(dtype, reduce_op, reduced_shape) + out = convert_none_out(dtype, reduce_op, reduced_shape) out = out[()] if reduced_shape == () else out # undo dummy dim from inside convert_none_out final_mask = tuple(np.where(mask_slice)[0]) @@ -2232,8 +2238,31 @@ def reduce_slices( # noqa: C901 return out -def convert_none_out(dtype, reduce_op, reduced_shape, axis=None): - out = None +def _get_res_out(reduced_shape, axis, dtype, reduce_op): + reduced_shape = (1,) if reduced_shape == () else reduced_shape + # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None + if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + temp_shape = tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)) + res_out_ = ( + np.zeros(temp_shape, dtype=dtype) + if reduce_op == ReduceOp.CUMULATIVE_SUM + else np.ones(temp_shape, dtype=dtype) + ) + elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: + temp_shape = reduced_shape + res_out_ = np.ones(temp_shape, dtype=dtype) + if np.issubdtype(dtype, np.integer): + res_out_ *= np.iinfo(dtype).max if reduce_op == ReduceOp.ARGMIN else np.iinfo(dtype).min + elif np.issubdtype(dtype, np.bool): + res_out_ = res_out_ if reduce_op == ReduceOp.ARGMIN else np.zeros(temp_shape, dtype=dtype) + else: + res_out_ *= np.inf if reduce_op == ReduceOp.ARGMIN else -np.inf + else: + res_out_ = None + return res_out_ + + +def convert_none_out(dtype, reduce_op, reduced_shape): reduced_shape = (1,) if reduced_shape == () else reduced_shape # out will be a proper numpy.ndarray if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_PROD}: @@ -2242,17 +2271,6 @@ def convert_none_out(dtype, reduce_op, reduced_shape, axis=None): if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM} else np.ones(reduced_shape, dtype=dtype) ) - # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None - if reduce_op in {ReduceOp.SUM, ReduceOp.PROD}: - res_out_ = None - else: - temp_shape = tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)) - res_out_ = ( - np.zeros(temp_shape, dtype=dtype) - if reduce_op == ReduceOp.CUMULATIVE_SUM - else np.ones(temp_shape, dtype=dtype) - ) - out = (out, res_out_) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): out = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) @@ -2268,15 +2286,8 @@ def convert_none_out(dtype, reduce_op, reduced_shape, axis=None): elif reduce_op == ReduceOp.ALL: out = np.ones(reduced_shape, dtype=np.bool_) elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: - res_out_ = np.ones(reduced_shape, dtype=dtype) - if np.issubdtype(dtype, np.integer): - res_out_ *= np.iinfo(dtype).max if reduce_op == ReduceOp.ARGMIN else np.iinfo(dtype).min - elif np.issubdtype(dtype, np.bool): - res_out_ = res_out_ if reduce_op == ReduceOp.ARGMIN else np.zeros(reduced_shape, dtype=dtype) - else: - res_out_ *= np.inf if reduce_op == ReduceOp.ARGMIN else -np.inf - out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) - return out if isinstance(out, tuple) else (out, None) + out = np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX) + return out def chunked_eval( diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 1edf7d27..b326aa7b 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -9,6 +9,7 @@ import builtins import math import warnings +from itertools import product import ndindex import numpy as np @@ -768,14 +769,65 @@ def _get_local_slice(prior_selection, post_selection, chunk_bounds): return locbegin, locend -def get_intersecting_chunks(_slice, shape, chunks): - if 0 not in chunks: - chunk_size = ndindex.ChunkSize(chunks) - return chunk_size.as_subchunks(_slice, shape) # if _slice is (), returns all chunks - else: - return ( - ndindex.ndindex(...).expand(shape), - ) # chunk is whole array so just return full tuple to do loop once +def get_intersecting_chunks(idx, shape, chunks, axis=None): + if 0 in chunks: # chunk is whole array so just return full tuple to do loop once + return (ndindex.ndindex(...).expand(shape),) + chunk_size = ndindex.ChunkSize(chunks) + if axis is None: + return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks + + def return_my_it(chunk_size, idx, shape, axis): + # special algorithm to iterate over axis first (adapted from ndindex source) + shape = ndindex.shapetools.asshape(shape) + + idx = ndindex.ndindex(idx).expand(shape) + + iters = [] + idx_args = tuple(a for i, a in enumerate(idx.args) if i != axis) + (idx.args[axis],) + idx_args = iter(idx_args) # iterate over tuple of slices in order + self_ = tuple(a for i, a in enumerate(chunk_size) if i != axis) + (chunk_size[axis],) + self_ = iter(self_) # iterate over chunk_shape in order + while True: + try: + i = next(idx_args) # slice along axis + n = next(self_) # chunklen along dimension + except StopIteration: + break + if not isinstance(i, ndindex.Slice): + raise ValueError("Only slices may be used with axis arg") + + def _slice_iter(s, n): + a, N, m = s.args + if m > n: + yield from ((a + k * m) // n for k in range(ceiling(N - a, m))) + else: + yield from range(a // n, ceiling(N, n)) + + iters.append(_slice_iter(i, n)) + + def _indices(iters): + my_list = [ndindex.Slice(None, None)] * len(chunk_size) + for p in product(*iters): + # p increments over arg axis first before other axes + # p = (...., -1, axis) + my_list[:axis] = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape[:axis], chunk_size[:axis], p[:axis], strict=True) + ] + n, cs, ci = shape[-1], chunk_size[-1], p[-1] + my_list[axis] = ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + my_list[axis + 1 :] = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape[axis:-1], chunk_size[axis:-1], p[axis:-1], strict=True) + ] + yield ndindex.Tuple(*my_list) + + for c in _indices(iters): + # Empty indices should be impossible by the construction of the + # iterators above. + yield from c + + return return_my_it(chunk_size, idx, shape, axis) def get_chunks_idx(shape, chunks): From f41850244562d9295ec9d532bb4f1b6dfa334657 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Thu, 5 Feb 2026 10:13:39 +0100 Subject: [PATCH 6/7] Revert "Add loop over reduction axis" This reverts commit de21fb57805cc45d74322bc5d273dfeafb2aaf37. --- src/blosc2/lazyexpr.py | 91 +++++++++++++++++++----------------------- src/blosc2/utils.py | 68 ++++--------------------------- 2 files changed, 48 insertions(+), 111 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 06ff5cd1..cca0f391 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -2058,14 +2058,8 @@ def reduce_slices( # noqa: C901 chunk_operands = {} # Check which chunks intersect with _slice # if chunks has 0 we loop once but fast path is false as gives error (schunk has no chunks) - if ( - np.isscalar(reduce_args["axis"]) and not fast_path - ): # iterate over chunks incrementing along reduction axis - intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks, axis=reduce_args["axis"]) - else: # iterate over chunks incrementing along last axis - intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) + intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) out_init = False - res_out_init = False for nchunk, chunk_slice in enumerate(intersecting_chunks): cslice = chunk_slice.raw @@ -2167,7 +2161,9 @@ def reduce_slices( # noqa: C901 result = reduce_op.value.reduce(result, **reduce_args) if not out_init: - out_ = convert_none_out(result.dtype, reduce_op, reduced_shape) + out_, res_out_ = convert_none_out( + result.dtype, reduce_op, reduced_shape, axis=reduce_args["axis"] + ) # if reduce_op == ReduceOp.CUMULATIVE_SUM: # kahan_sum = np.zeros_like(res_out_) if out is not None: @@ -2177,12 +2173,6 @@ def reduce_slices( # noqa: C901 out = out_ out_init = True - if (reduce_args["axis"] is None and not res_out_init) or ( - np.isscalar(reduce_args["axis"]) and cslice_subidx[reduce_args["axis"]].start == 0 - ): # starting reduction again along axis - res_out_ = _get_res_out(result.shape, reduce_args["axis"], dtype, reduce_op) - res_out_init = True - # Update the output array with the result if reduce_op == ReduceOp.ANY: out[reduced_slice] += result @@ -2191,29 +2181,33 @@ def reduce_slices( # noqa: C901 elif res_out_ is not None: # need lowest index for which optimum attained if reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: - cond = (res_out_ == result) & (result_idx < out[reduced_slice]) - cond |= res_out_ < result if reduce_op == ReduceOp.ARGMAX else res_out_ > result + cond = (res_out_[reduced_slice] == result) & (result_idx < out[reduced_slice]) + cond |= ( + res_out_[reduced_slice] < result + if reduce_op == ReduceOp.ARGMAX + else res_out_[reduced_slice] > result + ) out[reduced_slice] = np.where(cond, result_idx, out[reduced_slice]) - res_out_ = np.where(cond, result, res_out_) + res_out_[reduced_slice] = np.where(cond, result, res_out_[reduced_slice]) else: # CUMULATIVE_SUM or CUMULATIVE_PROD idx_result = tuple( slice(-1, None) if i == reduce_args["axis"] else slice(None, None) for i, c in enumerate(reduced_slice) ) - # idx_lastval = tuple( - # slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(reduced_slice) - # ) + idx_lastval = tuple( + slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(reduced_slice) + ) if reduce_op == ReduceOp.CUMULATIVE_SUM: # use Kahan summation algorithm for better precision # y = res_out_[idx_lastval] - kahan_sum[idx_lastval] # t = result + y # kahan_sum[idx_lastval] = ((t - result) - y)[idx_result] # result = t - result += res_out_ + result += res_out_[idx_lastval] else: # CUMULATIVE_PROD - result *= res_out_ + result *= res_out_[idx_lastval] out[reduced_slice] = result - res_out_ = result[idx_result] + res_out_[idx_lastval] = result[idx_result] else: out[reduced_slice] = reduce_op.value(out[reduced_slice], result) @@ -2226,7 +2220,7 @@ def reduce_slices( # noqa: C901 if dtype is None: # We have no hint here, so choose a default dtype dtype = np.float64 - out = convert_none_out(dtype, reduce_op, reduced_shape) + out, _ = convert_none_out(dtype, reduce_op, reduced_shape) out = out[()] if reduced_shape == () else out # undo dummy dim from inside convert_none_out final_mask = tuple(np.where(mask_slice)[0]) @@ -2238,31 +2232,8 @@ def reduce_slices( # noqa: C901 return out -def _get_res_out(reduced_shape, axis, dtype, reduce_op): - reduced_shape = (1,) if reduced_shape == () else reduced_shape - # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None - if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: - temp_shape = tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)) - res_out_ = ( - np.zeros(temp_shape, dtype=dtype) - if reduce_op == ReduceOp.CUMULATIVE_SUM - else np.ones(temp_shape, dtype=dtype) - ) - elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: - temp_shape = reduced_shape - res_out_ = np.ones(temp_shape, dtype=dtype) - if np.issubdtype(dtype, np.integer): - res_out_ *= np.iinfo(dtype).max if reduce_op == ReduceOp.ARGMIN else np.iinfo(dtype).min - elif np.issubdtype(dtype, np.bool): - res_out_ = res_out_ if reduce_op == ReduceOp.ARGMIN else np.zeros(temp_shape, dtype=dtype) - else: - res_out_ *= np.inf if reduce_op == ReduceOp.ARGMIN else -np.inf - else: - res_out_ = None - return res_out_ - - -def convert_none_out(dtype, reduce_op, reduced_shape): +def convert_none_out(dtype, reduce_op, reduced_shape, axis=None): + out = None reduced_shape = (1,) if reduced_shape == () else reduced_shape # out will be a proper numpy.ndarray if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_PROD}: @@ -2271,6 +2242,17 @@ def convert_none_out(dtype, reduce_op, reduced_shape): if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM} else np.ones(reduced_shape, dtype=dtype) ) + # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None + if reduce_op in {ReduceOp.SUM, ReduceOp.PROD}: + res_out_ = None + else: + temp_shape = tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)) + res_out_ = ( + np.zeros(temp_shape, dtype=dtype) + if reduce_op == ReduceOp.CUMULATIVE_SUM + else np.ones(temp_shape, dtype=dtype) + ) + out = (out, res_out_) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): out = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) @@ -2286,8 +2268,15 @@ def convert_none_out(dtype, reduce_op, reduced_shape): elif reduce_op == ReduceOp.ALL: out = np.ones(reduced_shape, dtype=np.bool_) elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: - out = np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX) - return out + res_out_ = np.ones(reduced_shape, dtype=dtype) + if np.issubdtype(dtype, np.integer): + res_out_ *= np.iinfo(dtype).max if reduce_op == ReduceOp.ARGMIN else np.iinfo(dtype).min + elif np.issubdtype(dtype, np.bool): + res_out_ = res_out_ if reduce_op == ReduceOp.ARGMIN else np.zeros(reduced_shape, dtype=dtype) + else: + res_out_ *= np.inf if reduce_op == ReduceOp.ARGMIN else -np.inf + out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) + return out if isinstance(out, tuple) else (out, None) def chunked_eval( diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index b326aa7b..1edf7d27 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -9,7 +9,6 @@ import builtins import math import warnings -from itertools import product import ndindex import numpy as np @@ -769,65 +768,14 @@ def _get_local_slice(prior_selection, post_selection, chunk_bounds): return locbegin, locend -def get_intersecting_chunks(idx, shape, chunks, axis=None): - if 0 in chunks: # chunk is whole array so just return full tuple to do loop once - return (ndindex.ndindex(...).expand(shape),) - chunk_size = ndindex.ChunkSize(chunks) - if axis is None: - return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks - - def return_my_it(chunk_size, idx, shape, axis): - # special algorithm to iterate over axis first (adapted from ndindex source) - shape = ndindex.shapetools.asshape(shape) - - idx = ndindex.ndindex(idx).expand(shape) - - iters = [] - idx_args = tuple(a for i, a in enumerate(idx.args) if i != axis) + (idx.args[axis],) - idx_args = iter(idx_args) # iterate over tuple of slices in order - self_ = tuple(a for i, a in enumerate(chunk_size) if i != axis) + (chunk_size[axis],) - self_ = iter(self_) # iterate over chunk_shape in order - while True: - try: - i = next(idx_args) # slice along axis - n = next(self_) # chunklen along dimension - except StopIteration: - break - if not isinstance(i, ndindex.Slice): - raise ValueError("Only slices may be used with axis arg") - - def _slice_iter(s, n): - a, N, m = s.args - if m > n: - yield from ((a + k * m) // n for k in range(ceiling(N - a, m))) - else: - yield from range(a // n, ceiling(N, n)) - - iters.append(_slice_iter(i, n)) - - def _indices(iters): - my_list = [ndindex.Slice(None, None)] * len(chunk_size) - for p in product(*iters): - # p increments over arg axis first before other axes - # p = (...., -1, axis) - my_list[:axis] = [ - ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) - for n, cs, ci in zip(shape[:axis], chunk_size[:axis], p[:axis], strict=True) - ] - n, cs, ci = shape[-1], chunk_size[-1], p[-1] - my_list[axis] = ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) - my_list[axis + 1 :] = [ - ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) - for n, cs, ci in zip(shape[axis:-1], chunk_size[axis:-1], p[axis:-1], strict=True) - ] - yield ndindex.Tuple(*my_list) - - for c in _indices(iters): - # Empty indices should be impossible by the construction of the - # iterators above. - yield from c - - return return_my_it(chunk_size, idx, shape, axis) +def get_intersecting_chunks(_slice, shape, chunks): + if 0 not in chunks: + chunk_size = ndindex.ChunkSize(chunks) + return chunk_size.as_subchunks(_slice, shape) # if _slice is (), returns all chunks + else: + return ( + ndindex.ndindex(...).expand(shape), + ) # chunk is whole array so just return full tuple to do loop once def get_chunks_idx(shape, chunks): From d9b1033cdb9ec1055858132a958d6ef0db6520d8 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Sun, 8 Feb 2026 19:32:56 +0100 Subject: [PATCH 7/7] Correct fast_path for arbitrary axis --- src/blosc2/lazyexpr.py | 65 +++++++++--------- src/blosc2/utils.py | 110 ++++++++++++++++++------------- tests/ndarray/test_reductions.py | 21 ++++-- 3 files changed, 110 insertions(+), 86 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 06ff5cd1..0e5b917d 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -51,6 +51,7 @@ from .utils import ( NUMPY_GE_2_0, _get_chunk_operands, + _sliced_chunk_iter, check_smaller_shape, compute_smaller_slice, constructors, @@ -1084,10 +1085,14 @@ def get_chunk(arr, info, nchunk): async def async_read_chunks(arrs, info, queue): loop = asyncio.get_event_loop() - nchunks = arrs[0].schunk.nchunks - + shape, chunks_ = arrs[0].shape, arrs[0].chunks with concurrent.futures.ThreadPoolExecutor() as executor: - for nchunk in range(nchunks): + my_chunk_iter = range(arrs[0].schunk.nchunks) + if len(info) == 5: + if info[-1] is not None: + my_chunk_iter = _sliced_chunk_iter(chunks_, (), shape, axis=info[-1], nchunk=True) + info = info[:4] + for i, nchunk in enumerate(my_chunk_iter): futures = [ (index, loop.run_in_executor(executor, get_chunk, arr, info, nchunk)) for index, arr in enumerate(arrs) @@ -1100,7 +1105,7 @@ async def async_read_chunks(arrs, info, queue): print(f"Exception occurred: {chunk}") raise chunk chunks_sorted.append(chunk) - queue.put((nchunk, chunks_sorted)) # use non-async queue.put() + queue.put((i, chunks_sorted)) # use non-async queue.put() queue.put(None) # signal the end of the chunks @@ -1137,7 +1142,7 @@ def read_nchunk(arrs, info): def fill_chunk_operands( - operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False + operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False, axis=None ): """Retrieve the chunk operands for evaluating an expression. @@ -1150,12 +1155,12 @@ def fill_chunk_operands( low_mem = os.environ.get("BLOSC_LOW_MEM", False) # This method is only useful when all operands are NDArray and shows better # performance only when at least one of them is persisted on disk - if nchunk == 0: + if iter_chunks is None: # Initialize the iterator for reading the chunks # Take any operand (all should have the same shape and chunks) key, arr = next(iter(operands.items())) chunks_idx, _ = get_chunks_idx(arr.shape, arr.chunks) - info = (reduc, aligned[key], low_mem, chunks_idx) + info = (reduc, aligned[key], low_mem, chunks_idx, axis) iter_chunks = read_nchunk(list(operands.values()), info) # Run the asynchronous file reading function from a synchronous context chunks = next(iter_chunks) @@ -1163,7 +1168,10 @@ def fill_chunk_operands( for i, (key, value) in enumerate(operands.items()): # Chunks are already decompressed, so we can use them directly if not low_mem: - chunk_operands[key] = chunks[i] + if full_chunk: + chunk_operands[key] = chunks[i] + else: + chunk_operands[key] = value[slice_] continue # Otherwise, we need to decompress them if aligned[key]: @@ -1568,10 +1576,12 @@ def slices_eval( # noqa: C901 intersecting_chunks = get_intersecting_chunks( _slice, shape, chunks ) # if _slice is (), returns all chunks + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) - for nchunk, chunk_slice in enumerate(intersecting_chunks): + for chunk_slice in intersecting_chunks: # Check whether current cslice intersects with _slice cslice = chunk_slice.raw + nchunk = builtins.sum([c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(cslice)]) if cslice != () and _slice != (): # get intersection of chunk and target cslice = step_handler(cslice, _slice) @@ -1889,9 +1899,9 @@ def reduce_slices( # noqa: C901 if np.any(mask_slice): add_idx = np.cumsum(mask_slice) axis = tuple(a + add_idx[a] for a in axis) # axis now refers to new shape with dummy dims - if reduce_args["axis"] is not None: - # conserve as integer if was not tuple originally - reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis + if reduce_args["axis"] is not None: + # conserve as integer if was not tuple originally + reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: reduced_shape = (np.prod(shape_slice),) if reduce_args["axis"] is None else shape_slice # if reduce_args["axis"] is None, have to have 1D input array; otherwise, ensure positive scalar @@ -2057,18 +2067,17 @@ def reduce_slices( # noqa: C901 # Iterate over the operands and get the chunks chunk_operands = {} # Check which chunks intersect with _slice - # if chunks has 0 we loop once but fast path is false as gives error (schunk has no chunks) - if ( - np.isscalar(reduce_args["axis"]) and not fast_path - ): # iterate over chunks incrementing along reduction axis + if np.isscalar(reduce_args["axis"]): # iterate over chunks incrementing along reduction axis intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks, axis=reduce_args["axis"]) else: # iterate over chunks incrementing along last axis intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) out_init = False res_out_init = False + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) - for nchunk, chunk_slice in enumerate(intersecting_chunks): + for chunk_slice in intersecting_chunks: cslice = chunk_slice.raw + nchunk = builtins.sum([c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(cslice)]) # Check whether current cslice intersects with _slice if cslice != () and _slice != (): # get intersection of chunk and target @@ -2077,6 +2086,8 @@ def reduce_slices( # noqa: C901 starts = [s.start if s.start is not None else 0 for s in cslice] unit_steps = np.all([s.step == 1 for s in cslice]) cslice_shape = tuple(s.stop - s.start for s in cslice) + # get local index of part of out that is to be updated + cslice_subidx = ndindex.ndindex(cslice).as_subindex(_slice).raw # if _slice is (), just gives cslice if _slice == () and fast_path and unit_steps: # Fast path full_chunk = cslice_shape == chunks @@ -2090,12 +2101,11 @@ def reduce_slices( # noqa: C901 iter_disk, chunk_operands, reduc=True, + axis=reduce_args["axis"] if np.isscalar(reduce_args["axis"]) else None, ) else: _get_chunk_operands(operands, cslice, chunk_operands, shape) - # get local index of part of out that is to be updated - cslice_subidx = ndindex.ndindex(cslice).as_subindex(_slice).raw # if _slice is (), just gives cslice if reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: reduced_slice = ( tuple( @@ -3160,22 +3170,7 @@ def find_args(expr): return value, expression[idx:idx2] - def _compute_expr(self, item, kwargs): # noqa : C901 - # ne_evaluate will need safe_blosc2_globals for some functions (e.g. clip, logaddexp) - # that are implemented in python-blosc2 not in numexpr - global safe_blosc2_globals - if len(safe_blosc2_globals) == 0: - # First eval call, fill blosc2_safe_globals for ne_evaluate - safe_blosc2_globals = {"blosc2": blosc2} - # Add all first-level blosc2 functions - safe_blosc2_globals.update( - { - name: getattr(blosc2, name) - for name in dir(blosc2) - if callable(getattr(blosc2, name)) and not name.startswith("_") - } - ) - + def _compute_expr(self, item, kwargs): if any(method in self.expression for method in eager_funcs): # We have reductions in the expression (probably coming from a string lazyexpr) # Also includes slice diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index b326aa7b..ec6602f7 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -769,65 +769,81 @@ def _get_local_slice(prior_selection, post_selection, chunk_bounds): return locbegin, locend -def get_intersecting_chunks(idx, shape, chunks, axis=None): - if 0 in chunks: # chunk is whole array so just return full tuple to do loop once - return (ndindex.ndindex(...).expand(shape),) - chunk_size = ndindex.ChunkSize(chunks) - if axis is None: - return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks - - def return_my_it(chunk_size, idx, shape, axis): - # special algorithm to iterate over axis first (adapted from ndindex source) - shape = ndindex.shapetools.asshape(shape) +def _sliced_chunk_iter(chunks, idx, shape, axis=None, nchunk=False): + """ + If nchunk is True, retrun at iterator over the number of the chunk. + """ + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) + idx = ndindex.ndindex(idx).expand(shape) + if axis is not None: + idx = tuple(a for i, a in enumerate(idx.args) if i != axis) + (idx.args[axis],) + chunks_ = tuple(a for i, a in enumerate(chunks) if i != axis) + (chunks[axis],) + else: + chunks_ = chunks + idx_iter = iter(idx) # iterate over tuple of slices in order + chunk_iter = iter(chunks_) # iterate over chunk_shape in order + + iters = [] + while True: + try: + i = next(idx_iter) # slice along axis + n = next(chunk_iter) # chunklen along dimension + except StopIteration: + break + if not isinstance(i, ndindex.Slice): + raise ValueError("Only slices may be used with axis arg") + + def _slice_iter(s, n): + a, N, m = s.args + if m > n: + yield from ((a + k * m) // n for k in range(ceiling(N - a, m))) + else: + yield from range(a // n, ceiling(N, n)) - idx = ndindex.ndindex(idx).expand(shape) + iters.append(_slice_iter(i, n)) - iters = [] - idx_args = tuple(a for i, a in enumerate(idx.args) if i != axis) + (idx.args[axis],) - idx_args = iter(idx_args) # iterate over tuple of slices in order - self_ = tuple(a for i, a in enumerate(chunk_size) if i != axis) + (chunk_size[axis],) - self_ = iter(self_) # iterate over chunk_shape in order - while True: - try: - i = next(idx_args) # slice along axis - n = next(self_) # chunklen along dimension - except StopIteration: - break - if not isinstance(i, ndindex.Slice): - raise ValueError("Only slices may be used with axis arg") - - def _slice_iter(s, n): - a, N, m = s.args - if m > n: - yield from ((a + k * m) // n for k in range(ceiling(N - a, m))) - else: - yield from range(a // n, ceiling(N, n)) - - iters.append(_slice_iter(i, n)) - - def _indices(iters): - my_list = [ndindex.Slice(None, None)] * len(chunk_size) - for p in product(*iters): - # p increments over arg axis first before other axes - # p = (...., -1, axis) + def _indices(iters): + my_list = [ndindex.Slice(None, None)] * len(chunks) + for p in product(*iters): + # p increments over arg axis first before other axes + # p = (...., -1, axis) + if axis is None: + my_list = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape, chunks, p, strict=True) + ] + else: my_list[:axis] = [ ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) - for n, cs, ci in zip(shape[:axis], chunk_size[:axis], p[:axis], strict=True) + for n, cs, ci in zip(shape[:axis], chunks[:axis], p[:axis], strict=True) ] - n, cs, ci = shape[-1], chunk_size[-1], p[-1] + n, cs, ci = shape[axis], chunks[axis], p[-1] my_list[axis] = ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) my_list[axis + 1 :] = [ ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) - for n, cs, ci in zip(shape[axis:-1], chunk_size[axis:-1], p[axis:-1], strict=True) + for n, cs, ci in zip(shape[axis + 1 :], chunks[axis + 1 :], p[axis:-1], strict=True) ] + if nchunk: + yield builtins.sum( + [c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(my_list)] + ) + else: yield ndindex.Tuple(*my_list) - for c in _indices(iters): - # Empty indices should be impossible by the construction of the - # iterators above. - yield from c + yield from _indices(iters) + + +def get_intersecting_chunks(idx, shape, chunks, axis=None): + if len(chunks) != len(shape): + raise ValueError("chunks must be same length as shape!") + if 0 in chunks: # chunk is whole array so just return full tuple to do loop once + return (ndindex.ndindex(...).expand(shape),), range(0) + chunk_size = ndindex.ChunkSize(chunks) + if axis is None: + return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks - return return_my_it(chunk_size, idx, shape, axis) + # special algorithm to iterate over axis first (adapted from ndindex source) + return _sliced_chunk_iter(chunks, idx, shape, axis) def get_chunks_idx(shape, chunks): diff --git a/tests/ndarray/test_reductions.py b/tests/ndarray/test_reductions.py index 1ab9d41f..add4f911 100644 --- a/tests/ndarray/test_reductions.py +++ b/tests/ndarray/test_reductions.py @@ -373,15 +373,15 @@ def test_reduce_slice(reduce_op): @pytest.mark.parametrize( ("chunks", "blocks"), [ + ((10, 50, 70), (10, 25, 50)), ((20, 50, 100), (10, 50, 100)), - ((10, 25, 70), (10, 25, 50)), ((10, 50, 100), (6, 25, 75)), ((15, 30, 75), (7, 20, 50)), - ((20, 50, 100), (10, 50, 60)), + ((1, 50, 100), (1, 50, 60)), ], ) @pytest.mark.parametrize("disk", [True, False]) -@pytest.mark.parametrize("fill_value", [0, 1, 0.32]) +@pytest.mark.parametrize("fill_value", [1, 0, 0.32]) @pytest.mark.parametrize( "reduce_op", [ @@ -400,7 +400,7 @@ def test_reduce_slice(reduce_op): "cumulative_prod", ], ) -@pytest.mark.parametrize("axis", [0, 1, None]) +@pytest.mark.parametrize("axis", [None, 0, 1]) def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): shape = (20, 50, 100) urlpath = "a1.b2nd" if disk else None @@ -420,6 +420,19 @@ def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): res = getattr(a, reduce_op)(axis=axis) assert np.allclose(res, nres) + # Try with a slice + b = blosc2.arange(0, np.prod(shape), blocks=blocks, chunks=chunks, shape=shape) + nb = b[:] + slice_ = (slice(10, 20),) + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 if axis is None else axis + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}((na + nb)[{slice_}], axis={axis})") + else: + nres = getattr((na + nb)[slice_], reduce_op)(axis=axis) + res = getattr(a + b, reduce_op)(axis=axis, item=slice_) + assert np.allclose(res, nres) + @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32])