diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 565d97a6..8b9e5500 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -13,6 +13,7 @@ import builtins import concurrent.futures import copy +import enum import inspect import linecache import math @@ -49,6 +50,10 @@ from .proxy import _convert_dtype from .utils import ( NUMPY_GE_2_0, + _get_chunk_operands, + _sliced_chunk_iter, + check_smaller_shape, + compute_smaller_slice, constructors, elementwise_funcs, get_chunks_idx, @@ -56,6 +61,8 @@ infer_shape, linalg_attrs, linalg_funcs, + npcumprod, + npcumsum, npvecdot, process_key, reducers, @@ -90,6 +97,8 @@ safe_numpy_globals["concat"] = np.concatenate safe_numpy_globals["matrix_transpose"] = np.transpose safe_numpy_globals["vecdot"] = npvecdot + safe_numpy_globals["cumulative_sum"] = npcumsum + safe_numpy_globals["cumulative_prod"] = npcumprod # Set this to False if miniexpr should not be tried out try_miniexpr = True @@ -137,6 +146,45 @@ def ne_evaluate(expression, local_dict=None, **kwargs): return res[()] if isinstance(res, blosc2.Operand) else res +def _get_result(expression, chunk_operands, ne_args, where=None, indices=None, _order=None): + chunk_indices = None + if (expression == "o0" or expression == "(o0)") and where is None: + # We don't have an actual expression, so avoid a copy except to make contiguous (later) + return chunk_operands["o0"], None + # Apply the where condition (in result) + if where is not None and len(where) == 2: + # x = chunk_operands["_where_x"] + # y = chunk_operands["_where_y"] + # result = np.where(result, x, y) + # numexpr is a bit faster than np.where, and we can fuse operations in this case + new_expr = f"where({expression}, _where_x, _where_y)" + return ne_evaluate(new_expr, chunk_operands, **ne_args), None + + result = ne_evaluate(expression, chunk_operands, **ne_args) + if where is None: + return result, None + elif len(where) == 1: + x = chunk_operands["_where_x"] + if (indices is not None) or (_order is not None): + # Return indices only makes sense when the where condition is a tuple with one element + # and result is a boolean array + if len(x.shape) > 1: + raise ValueError("indices() and sort() only support 1D arrays") + if result.dtype != np.bool_: + raise ValueError("indices() and sort() only support bool conditions") + if _order: + # We need to cumulate all the fields in _order, as well as indices + chunk_indices = indices[result] + result = x[_order][result] + else: + chunk_indices = None + result = indices[result] + return result, chunk_indices + else: + return x[result], None + raise ValueError("The where condition must be a tuple with one or two elements") + + # Define empty ndindex tuple for function defaults NDINDEX_EMPTY_TUPLE = ndindex.Tuple() @@ -229,23 +277,27 @@ class ReduceOp(Enum): Available reduce operations. """ - SUM = np.add - PROD = np.multiply - MEAN = np.mean - STD = np.std - VAR = np.var + # wrap as enum.member so that Python doesn't treat some funcs + # as class methods (rather than Enum members) + SUM = enum.member(np.add) + PROD = enum.member(np.multiply) + MEAN = enum.member(np.mean) + STD = enum.member(np.std) + VAR = enum.member(np.var) # Computing a median from partial results is not straightforward because the median # is a positional statistic, which means it depends on the relative ordering of all # the data points. Unlike statistics such as the sum or mean, you can't compute a median # from partial results without knowing the entire dataset, and this is way too expensive # for arrays that cannot typically fit in-memory (e.g. disk-based NDArray). # MEDIAN = np.median - MAX = np.maximum - MIN = np.minimum - ANY = np.any - ALL = np.all - ARGMAX = np.argmax - ARGMIN = np.argmin + MAX = enum.member(np.maximum) + MIN = enum.member(np.minimum) + ANY = enum.member(np.any) + ALL = enum.member(np.all) + ARGMAX = enum.member(np.argmax) + ARGMIN = enum.member(np.argmin) + CUMULATIVE_SUM = enum.member(npcumsum) + CUMULATIVE_PROD = enum.member(npcumprod) class LazyArrayEnum(Enum): @@ -522,79 +574,6 @@ def compute_broadcast_shape(arrays): return np.broadcast_shapes(*shapes) if shapes else None -def check_smaller_shape(value_shape, shape, slice_shape, slice_): - """Check whether the shape of the value is smaller than the shape of the array. - - This follows the NumPy broadcasting rules. - """ - # slice_shape must be as long as shape - if len(slice_shape) != len(slice_): - raise ValueError("slice_shape must be as long as slice_") - no_nones_shape = tuple(sh for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) - no_nones_slice = tuple(s for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) - is_smaller_shape = any( - s > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_shape) - ) - slice_past_bounds = any( - s.stop > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_slice) - ) - return len(value_shape) < len(shape) or is_smaller_shape or slice_past_bounds - - -def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice): - smaller_slice = [] - diff_dims = len(larger_shape) - len(smaller_shape) - - for i in range(len(larger_shape)): - if i < diff_dims: - # For leading dimensions of the larger array that the smaller array doesn't have, - # we don't add anything to the smaller slice - pass - else: - # For dimensions that both arrays have, the slice for the smaller array should be - # the same as the larger array unless the smaller array's size along that dimension - # is 1, in which case we use None to indicate the full slice - if smaller_shape[i - diff_dims] != 1: - smaller_slice.append(larger_slice[i]) - else: - smaller_slice.append(slice(0, larger_shape[i])) - - return tuple(smaller_slice) - - -# A more compact version of the function above, albeit less readable -def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): - """ - Returns the slice of the smaller array that corresponds to the slice of the larger array. - """ - j_small = len(smaller_shape) - 1 - j_large = len(larger_shape) - 1 - smaller_shape_nones = [] - larger_shape_nones = [] - for s in reversed(larger_slice): - if s is None: - smaller_shape_nones.append(1) - larger_shape_nones.append(1) - else: - if j_small >= 0: - smaller_shape_nones.append(smaller_shape[j_small]) - j_small -= 1 - if j_large >= 0: - larger_shape_nones.append(larger_shape[j_large]) - j_large -= 1 - smaller_shape_nones.reverse() - larger_shape_nones.reverse() - diff_dims = len(larger_shape_nones) - len(smaller_shape_nones) - return tuple( - None - if larger_slice[i] is None - else ( - larger_slice[i] if smaller_shape_nones[i - diff_dims] != 1 else slice(0, larger_shape_nones[i]) - ) - for i in range(diff_dims, len(larger_shape_nones)) - ) - - # Define the patterns for validation validation_patterns = [ r"[\;]", # Flow control characters @@ -1106,10 +1085,14 @@ def get_chunk(arr, info, nchunk): async def async_read_chunks(arrs, info, queue): loop = asyncio.get_event_loop() - nchunks = arrs[0].schunk.nchunks - + shape, chunks_ = arrs[0].shape, arrs[0].chunks with concurrent.futures.ThreadPoolExecutor() as executor: - for nchunk in range(nchunks): + my_chunk_iter = range(arrs[0].schunk.nchunks) + if len(info) == 5: + if info[-1] is not None: + my_chunk_iter = _sliced_chunk_iter(chunks_, (), shape, axis=info[-1], nchunk=True) + info = info[:4] + for i, nchunk in enumerate(my_chunk_iter): futures = [ (index, loop.run_in_executor(executor, get_chunk, arr, info, nchunk)) for index, arr in enumerate(arrs) @@ -1122,7 +1105,7 @@ async def async_read_chunks(arrs, info, queue): print(f"Exception occurred: {chunk}") raise chunk chunks_sorted.append(chunk) - queue.put((nchunk, chunks_sorted)) # use non-async queue.put() + queue.put((i, chunks_sorted)) # use non-async queue.put() queue.put(None) # signal the end of the chunks @@ -1159,7 +1142,7 @@ def read_nchunk(arrs, info): def fill_chunk_operands( - operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False + operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False, axis=None ): """Retrieve the chunk operands for evaluating an expression. @@ -1172,12 +1155,12 @@ def fill_chunk_operands( low_mem = os.environ.get("BLOSC_LOW_MEM", False) # This method is only useful when all operands are NDArray and shows better # performance only when at least one of them is persisted on disk - if nchunk == 0: + if iter_chunks is None: # Initialize the iterator for reading the chunks # Take any operand (all should have the same shape and chunks) key, arr = next(iter(operands.items())) chunks_idx, _ = get_chunks_idx(arr.shape, arr.chunks) - info = (reduc, aligned[key], low_mem, chunks_idx) + info = (reduc, aligned[key], low_mem, chunks_idx, axis) iter_chunks = read_nchunk(list(operands.values()), info) # Run the asynchronous file reading function from a synchronous context chunks = next(iter_chunks) @@ -1185,7 +1168,10 @@ def fill_chunk_operands( for i, (key, value) in enumerate(operands.items()): # Chunks are already decompressed, so we can use them directly if not low_mem: - chunk_operands[key] = chunks[i] + if full_chunk: + chunk_operands[key] = chunks[i] + else: + chunk_operands[key] = value[slice_] continue # Otherwise, we need to decompress them if aligned[key]: @@ -1590,14 +1576,16 @@ def slices_eval( # noqa: C901 intersecting_chunks = get_intersecting_chunks( _slice, shape, chunks ) # if _slice is (), returns all chunks + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) - for nchunk, chunk_slice in enumerate(intersecting_chunks): - # get intersection of chunk and target - if _slice != (): - cslice = step_handler(chunk_slice.raw, _slice) - else: - cslice = chunk_slice.raw - + for chunk_slice in intersecting_chunks: + # Check whether current cslice intersects with _slice + cslice = chunk_slice.raw + nchunk = builtins.sum([c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(cslice)]) + if cslice != () and _slice != (): + # get intersection of chunk and target + cslice = step_handler(cslice, _slice) + offset = tuple(s.start for s in cslice) # offset for the udf cslice_shape = tuple(s.stop - s.start for s in cslice) len_chunk = math.prod(cslice_shape) # get local index of part of out that is to be updated @@ -1605,35 +1593,7 @@ def slices_eval( # noqa: C901 ndindex.ndindex(cslice).as_subindex(_slice).raw ) # in the case _slice=(), just gives cslice - # Get the starts and stops for the slice - starts = [s.start if s.start is not None else 0 for s in cslice] - stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, cslice_shape, strict=True)] - unit_steps = np.all([s.step == 1 for s in cslice]) - - # Get the slice of each operand - for key, value in operands.items(): - if np.isscalar(value): - chunk_operands[key] = value - continue - if value.shape == (): - chunk_operands[key] = value[()] - continue - if check_smaller_shape(value.shape, shape, cslice_shape, cslice): - # We need to fetch the part of the value that broadcasts with the operand - smaller_slice = compute_smaller_slice(shape, value.shape, cslice) - chunk_operands[key] = value[smaller_slice] - continue - # If key is in operands, we can reuse the buffer - if ( - key in chunk_operands - and cslice_shape == chunk_operands[key].shape - and isinstance(value, blosc2.NDArray) - and unit_steps - ): - value.get_slice_numpy(chunk_operands[key], (starts, stops)) - continue - - chunk_operands[key] = value[cslice] + _get_chunk_operands(operands, cslice, chunk_operands, shape) if out is None: shape_ = shape_slice if shape_slice is not None else shape @@ -1664,47 +1624,16 @@ def slices_eval( # noqa: C901 result = np.empty(cslice_shape, dtype=out.dtype) # raises error if out is None # cslice should be equal to cslice_subidx # Call the udf directly and use result as the output array - offset = tuple(s.start for s in cslice) expression(tuple(chunk_operands.values()), result, offset=offset) out[cslice_subidx] = result continue - if where is None: - result = ne_evaluate(expression, chunk_operands, **ne_args) + if _indices or _order: + indices = np.arange(leninputs, leninputs + len_chunk, dtype=np.int64).reshape(cslice_shape) + leninputs += len_chunk + result, chunk_indices = _get_result(expression, chunk_operands, ne_args, where, indices, _order) else: - # Apply the where condition (in result) - if len(where) == 2: - # x = chunk_operands["_where_x"] - # y = chunk_operands["_where_y"] - # result = np.where(result, x, y) - # numexpr is a bit faster than np.where, and we can fuse operations in this case - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, chunk_operands, **ne_args) - elif len(where) == 1: - result = ne_evaluate(expression, chunk_operands, **ne_args) - if _indices or _order: - # Return indices only makes sense when the where condition is a tuple with one element - # and result is a boolean array - x = chunk_operands["_where_x"] - if len(x.shape) > 1: - raise ValueError("indices() and sort() only support 1D arrays") - if result.dtype != np.bool_: - raise ValueError("indices() and sort() only support bool conditions") - indices = np.arange(leninputs, leninputs + len_chunk, dtype=np.int64).reshape( - cslice_shape - ) - if _order: - # We need to cumulate all the fields in _order, as well as indices - chunk_indices = indices[result] - result = x[_order][result] - else: - result = indices[result] - leninputs += len_chunk - else: - x = chunk_operands["_where_x"] - result = x[result] - else: - raise ValueError("The where condition must be a tuple with one or two elements") + result, _ = _get_result(expression, chunk_operands, ne_args, where) # Enforce contiguity of result (necessary to fill the out array) # but avoid copy if already contiguous result = np.require(result, requirements="C") @@ -1811,9 +1740,9 @@ def slices_eval_getitem( shape = out.shape # compute the shape of the output array - _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice.raw) - slice_shape = ndindex.ndindex(_slice_bcast).newshape(shape) # includes dummy dimensions _slice = _slice.raw + _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice) + slice_shape = ndindex.ndindex(_slice_bcast).newshape(shape) # includes dummy dimensions # Get the slice of each operand slice_operands = {} @@ -1838,12 +1767,7 @@ def slices_eval_getitem( result = np.empty(slice_shape, dtype=dtype) expression(tuple(slice_operands.values()), result, offset=offset) else: - if where is None: - result = ne_evaluate(expression, slice_operands, **ne_args) - else: - # Apply the where condition (in result) - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, slice_operands, **ne_args) + result, _ = _get_result(expression, slice_operands, ne_args, where) if out is None: # avoid copying unnecessarily try: @@ -1863,7 +1787,7 @@ def infer_reduction_dtype(dtype, operation): my_float = np.result_type( dtype, np.float32 if dtype in (np.float32, np.complex64) else blosc2.DEFAULT_FLOAT ) - if operation in {ReduceOp.SUM, ReduceOp.PROD}: + if operation in {ReduceOp.SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: if np.issubdtype(dtype, np.bool_): return np.int64 if np.issubdtype(dtype, np.unsignedinteger): @@ -1945,7 +1869,8 @@ def reduce_slices( # noqa: C901 reduce_op = reduce_args.pop("op") reduce_op_str = reduce_args.pop("op_str", None) axis = reduce_args["axis"] - keepdims = reduce_args["keepdims"] + keepdims = reduce_args.get("keepdims", False) + include_initial = reduce_args.pop("include_initial", False) dtype = reduce_args.get("dtype", None) if dtype is None: dtype = kwargs.pop("dtype", None) @@ -1974,14 +1899,23 @@ def reduce_slices( # noqa: C901 if np.any(mask_slice): add_idx = np.cumsum(mask_slice) axis = tuple(a + add_idx[a] for a in axis) # axis now refers to new shape with dummy dims - if reduce_args["axis"] is not None: - # conserve as integer if was not tuple originally - reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis - if keepdims: - reduced_shape = tuple(1 if i in axis else s for i, s in enumerate(shape_slice)) + if reduce_args["axis"] is not None: + # conserve as integer if was not tuple originally + reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis + if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + reduced_shape = (np.prod(shape_slice),) if reduce_args["axis"] is None else shape_slice + # if reduce_args["axis"] is None, have to have 1D input array; otherwise, ensure positive scalar + reduce_args["axis"] = 0 if reduce_args["axis"] is None else axis[0] + if include_initial: + reduced_shape = tuple( + s + 1 if i == reduce_args["axis"] else s for i, s in enumerate(shape_slice) + ) else: - reduced_shape = tuple(s for i, s in enumerate(shape_slice) if i not in axis) - mask_slice = mask_slice[[i for i in range(len(mask_slice)) if i not in axis]] + if keepdims: + reduced_shape = tuple(1 if i in axis else s for i, s in enumerate(shape_slice)) + else: + reduced_shape = tuple(s for i, s in enumerate(shape_slice) if i not in axis) + mask_slice = mask_slice[[i for i in range(len(mask_slice)) if i not in axis]] if out is not None and reduced_shape != out.shape: raise ValueError("Provided output shape does not match the reduced shape.") @@ -2046,7 +1980,7 @@ def reduce_slices( # noqa: C901 use_miniexpr = False # Some reductions are not supported yet in miniexpr - if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN): + if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN, ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM): use_miniexpr = False # Check whether we can use miniexpr @@ -2081,9 +2015,9 @@ def reduce_slices( # noqa: C901 nblocks = res_eval.nbytes // res_eval.blocksize # Initialize aux_reduc based on the reduction operation # Padding blocks won't be written, so initial values matter for the final reduction - if reduce_op in {ReduceOp.SUM, ReduceOp.ANY}: + if reduce_op in {ReduceOp.SUM, ReduceOp.ANY, ReduceOp.CUMULATIVE_SUM}: aux_reduc = np.zeros(nblocks, dtype=dtype) - elif reduce_op in {ReduceOp.PROD, ReduceOp.ALL}: + elif reduce_op in {ReduceOp.PROD, ReduceOp.ALL, ReduceOp.CUMULATIVE_PROD}: aux_reduc = np.ones(nblocks, dtype=dtype) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): @@ -2124,10 +2058,8 @@ def reduce_slices( # noqa: C901 # (continue to the manual chunked evaluation below) pass else: - if reduce_op == ReduceOp.ANY: - result = np.any(aux_reduc, **reduce_args) - elif reduce_op == ReduceOp.ALL: - result = np.all(aux_reduc, **reduce_args) + if reduce_op in {ReduceOp.ANY, ReduceOp.ALL}: + result = reduce_op.value(aux_reduc, **reduce_args) else: result = reduce_op.value.reduce(aux_reduc, **reduce_args) return result @@ -2135,66 +2067,65 @@ def reduce_slices( # noqa: C901 # Iterate over the operands and get the chunks chunk_operands = {} # Check which chunks intersect with _slice - # if chunks has 0 we loop once but fast path is false as gives error (schunk has no chunks) - intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) + if np.isscalar(reduce_args["axis"]): # iterate over chunks incrementing along reduction axis + intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks, axis=reduce_args["axis"]) + else: # iterate over chunks incrementing along last axis + intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) out_init = False + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) - for nchunk, chunk_slice in enumerate(intersecting_chunks): + for chunk_slice in intersecting_chunks: cslice = chunk_slice.raw - offset = tuple(s.start for s in cslice) # offset for the udf + nchunk = builtins.sum([c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(cslice)]) # Check whether current cslice intersects with _slice if cslice != () and _slice != (): # get intersection of chunk and target cslice = step_handler(cslice, _slice) - chunks_ = tuple(s.stop - s.start for s in cslice) - unit_steps = np.all([s.step == 1 for s in cslice]) - # Starts for slice + offset = tuple(s.start for s in cslice) # offset for the udf starts = [s.start if s.start is not None else 0 for s in cslice] + unit_steps = np.all([s.step == 1 for s in cslice]) + cslice_shape = tuple(s.stop - s.start for s in cslice) + # get local index of part of out that is to be updated + cslice_subidx = ndindex.ndindex(cslice).as_subindex(_slice).raw # if _slice is (), just gives cslice if _slice == () and fast_path and unit_steps: # Fast path - full_chunk = chunks_ == chunks + full_chunk = cslice_shape == chunks fill_chunk_operands( - operands, cslice, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=True + operands, + cslice, + cslice_shape, + full_chunk, + aligned, + nchunk, + iter_disk, + chunk_operands, + reduc=True, + axis=reduce_args["axis"] if np.isscalar(reduce_args["axis"]) else None, ) else: - # Get the stops for the slice - stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, chunks_, strict=True)] - # Get the slice of each operand - for key, value in operands.items(): - if np.isscalar(value): - chunk_operands[key] = value - continue - if value.shape == (): - chunk_operands[key] = value[()] - continue - if check_smaller_shape(value.shape, shape, chunks_, cslice): - # We need to fetch the part of the value that broadcasts with the operand - smaller_slice = compute_smaller_slice(operand.shape, value.shape, cslice) - chunk_operands[key] = value[smaller_slice] - continue - # If key is in operands, we can reuse the buffer - if ( - key in chunk_operands - and chunks_ == chunk_operands[key].shape - and isinstance(value, blosc2.NDArray) - and unit_steps - ): - value.get_slice_numpy(chunk_operands[key], (starts, stops)) - continue - chunk_operands[key] = value[cslice] + _get_chunk_operands(operands, cslice, chunk_operands, shape) - # get local index of part of out that is to be updated - cslice_subidx = ndindex.ndindex(cslice).as_subindex(_slice).raw # if _slice is (), just gives cslice - if keepdims: - reduced_slice = tuple(slice(None) if i in axis else sl for i, sl in enumerate(cslice_subidx)) + if reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: + reduced_slice = ( + tuple( + slice(sl.start + 1, sl.stop + 1, sl.step) if i == reduce_args["axis"] else sl + for i, sl in enumerate(cslice_subidx) + ) + if include_initial + else cslice_subidx + ) else: - reduced_slice = tuple(sl for i, sl in enumerate(cslice_subidx) if i not in axis) + reduced_slice = ( + tuple(slice(None) if i in axis else sl for i, sl in enumerate(cslice_subidx)) + if keepdims + else tuple(sl for i, sl in enumerate(cslice_subidx) if i not in axis) + ) # Evaluate and reduce the expression using chunks of operands if callable(expression): # TODO: Implement the reductions for UDFs (and test them) - result = np.empty(chunks_, dtype=out.dtype) + result = np.empty(cslice_shape, dtype=out.dtype) expression(tuple(chunk_operands.values()), result, offset=offset) # Reduce the result result = reduce_op.value.reduce(result, **reduce_args) @@ -2202,37 +2133,20 @@ def reduce_slices( # noqa: C901 out[reduced_slice] = reduce_op.value(out[reduced_slice], result) continue - if where is None: - if expression in {"o0", "(o0)"}: - # We don't have an actual expression, so avoid a copy except to make contiguous - result = np.require(chunk_operands["o0"], requirements="C") - else: - result = ne_evaluate(expression, chunk_operands, **ne_args) - else: - # Apply the where condition (in result) - if len(where) == 2: - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, chunk_operands, **ne_args) - elif len(where) == 1: - result = ne_evaluate(expression, chunk_operands, **ne_args) - x = chunk_operands["_where_x"] - result = x[result] - else: - raise NotImplementedError( - "A where condition with no params in combination with reductions is not supported yet" - ) + result, _ = _get_result(expression, chunk_operands, ne_args, where) + # Enforce contiguity of result (necessary to fill the out array) + # but avoid copy if already contiguous + result = np.require(result, requirements="C") # Reduce the result if result.shape == (): if reduce_op == ReduceOp.SUM and result[()] == 0: # Avoid a reduction when result is a zero scalar. Faster for sparse data. continue - # Note that chunks_ refers to slice of operand chunks, not reduced_slice - result = np.full(chunks_, result[()]) - if reduce_op == ReduceOp.ANY: - result = np.any(result, **reduce_args) - elif reduce_op == ReduceOp.ALL: - result = np.all(result, **reduce_args) + # Note that cslice_shape refers to slice of operand chunks, not reduced_slice + result = np.full(cslice_shape, result[()]) + if reduce_op in {ReduceOp.ANY, ReduceOp.ALL, ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + result = reduce_op.value(result, **reduce_args) elif reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: # offset for start of slice slice_ref = ( @@ -2243,14 +2157,10 @@ def reduce_slices( # noqa: C901 for s, sl in zip(starts, _slice, strict=True) ] ) - result_idx = ( - np.argmin(result, **reduce_args) - if reduce_op == ReduceOp.ARGMIN - else np.argmax(result, **reduce_args) - ) + result_idx = reduce_op.value(result, **reduce_args) if reduce_args["axis"] is None: # indexing into flattened array result = result[np.unravel_index(result_idx, shape=result.shape)] - idx_within_cslice = np.unravel_index(result_idx, shape=chunks_) + idx_within_cslice = np.unravel_index(result_idx, shape=cslice_shape) result_idx = np.ravel_multi_index( tuple(o + i for o, i in zip(slice_ref, idx_within_cslice, strict=True)), shape_slice ) @@ -2266,7 +2176,11 @@ def reduce_slices( # noqa: C901 result = reduce_op.value.reduce(result, **reduce_args) if not out_init: - out_, res_out_ = convert_none_out(result.dtype, reduce_op, reduced_shape) + out_, res_out_ = convert_none_out( + result.dtype, reduce_op, reduced_shape, axis=reduce_args["axis"] + ) + # if reduce_op == ReduceOp.CUMULATIVE_SUM: + # kahan_sum = np.zeros_like(res_out_) if out is not None: out[:] = out_ del out_ @@ -2279,24 +2193,38 @@ def reduce_slices( # noqa: C901 out[reduced_slice] += result elif reduce_op == ReduceOp.ALL: out[reduced_slice] *= result - elif res_out_ is not None: # i.e. ReduceOp.ARGMAX or ReduceOp.ARGMIN + elif res_out_ is not None: # need lowest index for which optimum attained - cond = (res_out_[reduced_slice] == result) & (result_idx < out[reduced_slice]) - if reduce_op == ReduceOp.ARGMAX: - cond |= res_out_[reduced_slice] < result - else: # ARGMIN - cond |= res_out_[reduced_slice] > result - if reduced_slice == (): - out = np.where(cond, result_idx, out[reduced_slice]) - res_out_ = np.where(cond, result, res_out_[reduced_slice]) - else: + if reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: + cond = (res_out_[reduced_slice] == result) & (result_idx < out[reduced_slice]) + cond |= ( + res_out_[reduced_slice] < result + if reduce_op == ReduceOp.ARGMAX + else res_out_[reduced_slice] > result + ) out[reduced_slice] = np.where(cond, result_idx, out[reduced_slice]) res_out_[reduced_slice] = np.where(cond, result, res_out_[reduced_slice]) + else: # CUMULATIVE_SUM or CUMULATIVE_PROD + idx_result = tuple( + slice(-1, None) if i == reduce_args["axis"] else slice(None, None) + for i, c in enumerate(reduced_slice) + ) + idx_lastval = tuple( + slice(0, 1) if i == reduce_args["axis"] else c for i, c in enumerate(reduced_slice) + ) + if reduce_op == ReduceOp.CUMULATIVE_SUM: + # use Kahan summation algorithm for better precision + # y = res_out_[idx_lastval] - kahan_sum[idx_lastval] + # t = result + y + # kahan_sum[idx_lastval] = ((t - result) - y)[idx_result] + # result = t + result += res_out_[idx_lastval] + else: # CUMULATIVE_PROD + result *= res_out_[idx_lastval] + out[reduced_slice] = result + res_out_[idx_lastval] = result[idx_result] else: - if reduced_slice == (): - out = reduce_op.value(out, result) - else: - out[reduced_slice] = reduce_op.value(out[reduced_slice], result) + out[reduced_slice] = reduce_op.value(out[reduced_slice], result) # No longer need res_out_ del res_out_ @@ -2309,6 +2237,7 @@ def reduce_slices( # noqa: C901 dtype = np.float64 out, _ = convert_none_out(dtype, reduce_op, reduced_shape) + out = out[()] if reduced_shape == () else out # undo dummy dim from inside convert_none_out final_mask = tuple(np.where(mask_slice)[0]) if np.any(mask_slice): # remove dummy dims out = np.squeeze(out, axis=final_mask) @@ -2318,13 +2247,27 @@ def reduce_slices( # noqa: C901 return out -def convert_none_out(dtype, reduce_op, reduced_shape): +def convert_none_out(dtype, reduce_op, reduced_shape, axis=None): out = None + reduced_shape = (1,) if reduced_shape == () else reduced_shape # out will be a proper numpy.ndarray - if reduce_op == ReduceOp.SUM: - out = np.zeros(reduced_shape, dtype=dtype) - elif reduce_op == ReduceOp.PROD: - out = np.ones(reduced_shape, dtype=dtype) + if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_PROD}: + out = ( + np.zeros(reduced_shape, dtype=dtype) + if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM} + else np.ones(reduced_shape, dtype=dtype) + ) + # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None + if reduce_op in {ReduceOp.SUM, ReduceOp.PROD}: + res_out_ = None + else: + temp_shape = tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)) + res_out_ = ( + np.zeros(temp_shape, dtype=dtype) + if reduce_op == ReduceOp.CUMULATIVE_SUM + else np.ones(temp_shape, dtype=dtype) + ) + out = (out, res_out_) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): out = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) @@ -2339,17 +2282,14 @@ def convert_none_out(dtype, reduce_op, reduced_shape): out = np.zeros(reduced_shape, dtype=np.bool_) elif reduce_op == ReduceOp.ALL: out = np.ones(reduced_shape, dtype=np.bool_) - elif reduce_op == ReduceOp.ARGMIN: - if np.issubdtype(dtype, np.integer): - res_out_ = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) - else: - res_out_ = np.inf * np.ones(reduced_shape, dtype=dtype) - out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) - elif reduce_op == ReduceOp.ARGMAX: + elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: + res_out_ = np.ones(reduced_shape, dtype=dtype) if np.issubdtype(dtype, np.integer): - res_out_ = np.iinfo(dtype).min * np.ones(reduced_shape, dtype=dtype) + res_out_ *= np.iinfo(dtype).max if reduce_op == ReduceOp.ARGMIN else np.iinfo(dtype).min + elif np.issubdtype(dtype, np.bool): + res_out_ = res_out_ if reduce_op == ReduceOp.ARGMIN else np.zeros(reduced_shape, dtype=dtype) else: - res_out_ = -np.inf * np.ones(reduced_shape, dtype=dtype) + res_out_ *= np.inf if reduce_op == ReduceOp.ARGMIN else -np.inf out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) return out if isinstance(out, tuple) else (out, None) @@ -3141,6 +3081,38 @@ def argmin( } return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + def cumulative_sum( + self, + axis=None, + include_initial: bool = False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): + reduce_args = { + "op": ReduceOp.CUMULATIVE_SUM, + "axis": axis, + "include_initial": include_initial, + } + if self.ndim != 1 and axis is None: + raise ValueError("axis must be specified for cumulative_sum of non-1D array.") + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def cumulative_prod( + self, + axis=None, + include_initial: bool = False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): + reduce_args = { + "op": ReduceOp.CUMULATIVE_PROD, + "axis": axis, + "include_initial": include_initial, + } + if self.ndim != 1 and axis is None: + raise ValueError("axis must be specified for cumulative_prod of non-1D array.") + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + def _eval_constructor(self, expression, constructor, operands): """Evaluate a constructor function inside a string expression.""" @@ -3192,22 +3164,7 @@ def find_args(expr): return value, expression[idx:idx2] - def _compute_expr(self, item, kwargs): # noqa : C901 - # ne_evaluate will need safe_blosc2_globals for some functions (e.g. clip, logaddexp) - # that are implemented in python-blosc2 not in numexpr - global safe_blosc2_globals - if len(safe_blosc2_globals) == 0: - # First eval call, fill blosc2_safe_globals for ne_evaluate - safe_blosc2_globals = {"blosc2": blosc2} - # Add all first-level blosc2 functions - safe_blosc2_globals.update( - { - name: getattr(blosc2, name) - for name in dir(blosc2) - if callable(getattr(blosc2, name)) and not name.startswith("_") - } - ) - + def _compute_expr(self, item, kwargs): if any(method in self.expression for method in eager_funcs): # We have reductions in the expression (probably coming from a string lazyexpr) # Also includes slice diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 13144f4c..121d6a73 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -538,6 +538,78 @@ def sum( return ndarr.sum(axis=axis, dtype=dtype, keepdims=keepdims, **kwargs) +def cumulative_sum( + ndarr: blosc2.Array, + axis: int | tuple[int] | None = None, + dtype: np.dtype | str = None, + include_initial: bool = False, + **kwargs: Any, +) -> blosc2.Array: + """ + Calculates the cumulative sum of elements in the input array ndarr. + + Parameters + ----------- + ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` + The input array or expression. + axis: int + Axis along which a cumulative sum must be computed. If array is 1D, axis may be None; otherwise the axis must be specified. + dtype: dtype + Data type of the returned array. + include_initial : bool + Boolean indicating whether to include the initial value as the first value in the output. Initial value will be zero. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. + kwargs: dict, optional + Additional keyword arguments supported by the :func:`empty` constructor. + + Returns + ------- + out: blosc2.Array + An array containing the cumulative sums. Let N be the size of the axis along which to compute the cumulative sum. + If include_initial is True, the returned array has the same shape as ndarr, except the size of the axis along which to compute the cumulative sum is N+1. + If include_initial is False, the returned array has the same shape as ndarr. + """ + return ndarr.cumulative_sum(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + +def cumulative_prod( + ndarr: blosc2.Array, + axis: int | tuple[int] | None = None, + dtype: np.dtype | str = None, + include_initial: bool = False, + **kwargs: Any, +) -> blosc2.Array: + """ + Calculates the cumulative product of elements in the input array ndarr. + + Parameters + ----------- + ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` + The input array or expression. + axis: int + Axis along which a cumulative product must be computed. If array is 1D, axis may be None; otherwise the axis must be specified. + dtype: dtype + Data type of the returned array. + include_initial : bool + Boolean indicating whether to include the initial value as the first value in the output. Initial value will be one. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. + kwargs: dict, optional + Additional keyword arguments supported by the :func:`empty` constructor. + + Returns + ------- + out: blosc2.Array + An array containing the cumulative products. Let N be the size of the axis along which to compute the cumulative product. + If include_initial is True, the returned array has the same shape as ndarr, except the size of the axis along which to compute the cumulative product is N+1. + If include_initial is False, the returned array has the same shape as ndarr. + """ + return ndarr.cumulative_prod(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + def mean( ndarr: blosc2.Array, axis: int | tuple[int] | None = None, @@ -3371,6 +3443,16 @@ def sum(self, axis=None, dtype=None, keepdims=False, **kwargs): expr = blosc2.LazyExpr(new_op=(self, None, None)) return expr.sum(axis=axis, dtype=dtype, keepdims=keepdims, **kwargs) + @is_documented_by(cumulative_sum) + def cumulative_sum(self, axis=None, dtype=None, include_initial=False, **kwargs): + expr = blosc2.LazyExpr(new_op=(self, None, None)) + return expr.cumulative_sum(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + @is_documented_by(cumulative_prod) + def cumulative_prod(self, axis=None, dtype=None, include_initial=False, **kwargs): + expr = blosc2.LazyExpr(new_op=(self, None, None)) + return expr.cumulative_prod(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + @is_documented_by(mean) def mean(self, axis=None, dtype=None, keepdims=False, **kwargs): expr = blosc2.LazyExpr(new_op=(self, None, None)) diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 2903c8c3..ec6602f7 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -9,12 +9,15 @@ import builtins import math import warnings +from itertools import product import ndindex import numpy as np from ndindex.subindex_helpers import ceiling from numpy import broadcast_shapes +import blosc2 + # NumPy version and a convenient boolean flag NUMPY_GE_2_0 = np.__version__ >= "2.0" # handle different numpy versions @@ -24,11 +27,19 @@ npbinvert = np.bitwise_invert npvecdot = np.vecdot nptranspose = np.permute_dims + if np.__version__ >= "2.1": + npcumsum = np.cumulative_sum + npcumprod = np.cumulative_prod + else: + npcumsum = np.cumsum + npcumprod = np.cumprod else: # not array-api compliant nplshift = np.left_shift nprshift = np.right_shift npbinvert = np.bitwise_not nptranspose = np.transpose + npcumsum = np.cumsum + npcumprod = np.cumprod def npvecdot(a, b, axis=-1): return np.einsum("...i,...i->...", np.moveaxis(np.conj(a), axis, -1), np.moveaxis(b, axis, -1)) @@ -758,14 +769,81 @@ def _get_local_slice(prior_selection, post_selection, chunk_bounds): return locbegin, locend -def get_intersecting_chunks(_slice, shape, chunks): - if 0 not in chunks: - chunk_size = ndindex.ChunkSize(chunks) - return chunk_size.as_subchunks(_slice, shape) # if _slice is (), returns all chunks +def _sliced_chunk_iter(chunks, idx, shape, axis=None, nchunk=False): + """ + If nchunk is True, retrun at iterator over the number of the chunk. + """ + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) + idx = ndindex.ndindex(idx).expand(shape) + if axis is not None: + idx = tuple(a for i, a in enumerate(idx.args) if i != axis) + (idx.args[axis],) + chunks_ = tuple(a for i, a in enumerate(chunks) if i != axis) + (chunks[axis],) else: - return ( - ndindex.ndindex(...).expand(shape), - ) # chunk is whole array so just return full tuple to do loop once + chunks_ = chunks + idx_iter = iter(idx) # iterate over tuple of slices in order + chunk_iter = iter(chunks_) # iterate over chunk_shape in order + + iters = [] + while True: + try: + i = next(idx_iter) # slice along axis + n = next(chunk_iter) # chunklen along dimension + except StopIteration: + break + if not isinstance(i, ndindex.Slice): + raise ValueError("Only slices may be used with axis arg") + + def _slice_iter(s, n): + a, N, m = s.args + if m > n: + yield from ((a + k * m) // n for k in range(ceiling(N - a, m))) + else: + yield from range(a // n, ceiling(N, n)) + + iters.append(_slice_iter(i, n)) + + def _indices(iters): + my_list = [ndindex.Slice(None, None)] * len(chunks) + for p in product(*iters): + # p increments over arg axis first before other axes + # p = (...., -1, axis) + if axis is None: + my_list = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape, chunks, p, strict=True) + ] + else: + my_list[:axis] = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape[:axis], chunks[:axis], p[:axis], strict=True) + ] + n, cs, ci = shape[axis], chunks[axis], p[-1] + my_list[axis] = ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + my_list[axis + 1 :] = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape[axis + 1 :], chunks[axis + 1 :], p[axis:-1], strict=True) + ] + if nchunk: + yield builtins.sum( + [c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(my_list)] + ) + else: + yield ndindex.Tuple(*my_list) + + yield from _indices(iters) + + +def get_intersecting_chunks(idx, shape, chunks, axis=None): + if len(chunks) != len(shape): + raise ValueError("chunks must be same length as shape!") + if 0 in chunks: # chunk is whole array so just return full tuple to do loop once + return (ndindex.ndindex(...).expand(shape),), range(0) + chunk_size = ndindex.ChunkSize(chunks) + if axis is None: + return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks + + # special algorithm to iterate over axis first (adapted from ndindex source) + return _sliced_chunk_iter(chunks, idx, shape, axis) def get_chunks_idx(shape, chunks): @@ -781,3 +859,107 @@ def process_key(key, shape): ) # mask to track dummy dims introduced by int -> slice(k, k+1) key = tuple(slice(k, k + 1, None) if isinstance(k, int) else k for k in key) # key is slice, None, int return key, mask + + +def check_smaller_shape(value_shape, shape, slice_shape, slice_): + """Check whether the shape of the value is smaller than the shape of the array. + + This follows the NumPy broadcasting rules. + """ + # slice_shape must be as long as shape + if len(slice_shape) != len(slice_): + raise ValueError("slice_shape must be as long as slice_") + no_nones_shape = tuple(sh for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) + no_nones_slice = tuple(s for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) + is_smaller_shape = any( + s > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_shape) + ) + slice_past_bounds = any( + s.stop > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_slice) + ) + return len(value_shape) < len(shape) or is_smaller_shape or slice_past_bounds + + +def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice): + smaller_slice = [] + diff_dims = len(larger_shape) - len(smaller_shape) + + for i in range(len(larger_shape)): + if i < diff_dims: + # For leading dimensions of the larger array that the smaller array doesn't have, + # we don't add anything to the smaller slice + pass + else: + # For dimensions that both arrays have, the slice for the smaller array should be + # the same as the larger array unless the smaller array's size along that dimension + # is 1, in which case we use None to indicate the full slice + if smaller_shape[i - diff_dims] != 1: + smaller_slice.append(larger_slice[i]) + else: + smaller_slice.append(slice(0, larger_shape[i])) + + return tuple(smaller_slice) + + +# A more compact version of the function above, albeit less readable +def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): + """ + Returns the slice of the smaller array that corresponds to the slice of the larger array. + """ + j_small = len(smaller_shape) - 1 + j_large = len(larger_shape) - 1 + smaller_shape_nones = [] + larger_shape_nones = [] + for s in reversed(larger_slice): + if s is None: + smaller_shape_nones.append(1) + larger_shape_nones.append(1) + else: + if j_small >= 0: + smaller_shape_nones.append(smaller_shape[j_small]) + j_small -= 1 + if j_large >= 0: + larger_shape_nones.append(larger_shape[j_large]) + j_large -= 1 + smaller_shape_nones.reverse() + larger_shape_nones.reverse() + diff_dims = len(larger_shape_nones) - len(smaller_shape_nones) + return tuple( + None + if larger_slice[i] is None + else ( + larger_slice[i] if smaller_shape_nones[i - diff_dims] != 1 else slice(0, larger_shape_nones[i]) + ) + for i in range(diff_dims, len(larger_shape_nones)) + ) + + +def _get_chunk_operands(operands, cslice, chunk_operands, shape): + # Get the starts and stops for the slice + cslice_shape = tuple(s.stop - s.start for s in cslice) + starts = [s.start if s.start is not None else 0 for s in cslice] + stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, cslice_shape, strict=True)] + unit_steps = np.all([s.step == 1 for s in cslice]) + # Get the slice of each operand + for key, value in operands.items(): + if np.isscalar(value): + chunk_operands[key] = value + continue + if value.shape == (): + chunk_operands[key] = value[()] + continue + if check_smaller_shape(value.shape, shape, cslice_shape, cslice): + # We need to fetch the part of the value that broadcasts with the operand + smaller_slice = compute_smaller_slice(shape, value.shape, cslice) + chunk_operands[key] = value[smaller_slice] + continue + # If key is in operands, we can reuse the buffer + if ( + key in chunk_operands + and cslice_shape == chunk_operands[key].shape + and isinstance(value, blosc2.NDArray) + and unit_steps + ): + value.get_slice_numpy(chunk_operands[key], (starts, stops)) + continue + chunk_operands[key] = value[cslice] diff --git a/tests/ndarray/test_reductions.py b/tests/ndarray/test_reductions.py index ca14f790..c1a38822 100644 --- a/tests/ndarray/test_reductions.py +++ b/tests/ndarray/test_reductions.py @@ -11,7 +11,7 @@ import pytest import blosc2 -from blosc2.lazyexpr import ne_evaluate +from blosc2.lazyexpr import ne_evaluate, npcumsum NITEMS_SMALL = 1000 NITEMS = 10_000 @@ -52,24 +52,34 @@ def array_fixture(dtype_fixture, shape_fixture): # @pytest.mark.parametrize("reduce_op", ["sum"]) -@pytest.mark.parametrize("reduce_op", ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin"]) +@pytest.mark.parametrize( + "reduce_op", + ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin", "cumulative_sum", "cumulative_prod"], +) def test_reduce_bool(array_fixture, reduce_op): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture expr = (a1 + a2) > (a3 * a4) nres = ne_evaluate("(na1 + na2) > (na3 * na4)") - res = getattr(expr, reduce_op)() - # res = expr.sum() - # print("res:", res) - nres = getattr(nres, reduce_op)() + axis = None + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(nres, axis={axis})") + else: + nres = getattr(nres, reduce_op)(axis=axis) + res = getattr(expr, reduce_op)(axis=axis) tol = 1e-15 if a1.dtype == "float64" else 1e-6 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # @pytest.mark.parametrize("reduce_op", ["sum"]) -@pytest.mark.parametrize("reduce_op", ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin"]) +@pytest.mark.parametrize( + "reduce_op", + ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin", "cumulative_sum", "cumulative_prod"], +) def test_reduce_where(array_fixture, reduce_op): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture - if reduce_op == "prod": + if reduce_op in {"prod", "cumulative_prod"}: # To avoid overflow, create a1 and a2 with small values na1 = np.linspace(0, 0.1, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) a1 = blosc2.asarray(na1) @@ -80,8 +90,14 @@ def test_reduce_where(array_fixture, reduce_op): else: expr = blosc2.where(a1 < a2, a2, a1) nres = eval("np.where(na1 < na2, na2, na1)") - res = getattr(expr, reduce_op)() - nres = getattr(nres, reduce_op)() + axis = None + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(nres, axis={axis})") + else: + nres = getattr(nres, reduce_op)(axis=axis) + res = getattr(expr, reduce_op)(axis=axis) # print("res:", res, nres, type(res), type(nres)) tol = 1e-15 if a1.dtype == "float64" else 1e-6 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) @@ -105,7 +121,22 @@ def test_fp_accuracy(accuracy, dtype): @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "mean", "std", "var", "min", "max", "any", "all", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "mean", + "std", + "var", + "min", + "max", + "any", + "all", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [1, (0, 1), None]) @pytest.mark.parametrize("keepdims", [True, False]) @@ -117,11 +148,21 @@ def test_fp_accuracy(accuracy, dtype): @pytest.mark.heavy def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwargs): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture + reduce_args = {"axis": axis} + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + reduce_args["include_initial"] = keepdims + else: + reduce_args["keepdims"] = keepdims + if reduce_op in ("mean", "std") and dtype_out == np.int16: + # mean and std need float dtype as output + dtype_out = np.float64 + if reduce_op in ("sum", "prod", "mean", "std"): + reduce_args["dtype"] = dtype_out if axis is not None and np.isscalar(axis) and len(a1.shape) >= axis: return if isinstance(axis, tuple) and (len(a1.shape) < len(axis) or reduce_op in ("argmax", "argmin")): return - if reduce_op == "prod": + if reduce_op in {"prod", "cumulative_prod"}: # To avoid overflow, create a1 and a2 with small values na1 = np.linspace(0, 0.1, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) a1 = blosc2.asarray(na1) @@ -132,15 +173,9 @@ def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwar else: expr = a1 + a2 - a3 * a4 nres = eval("na1 + na2 - na3 * na4") - if reduce_op in ("sum", "prod", "mean", "std"): - if reduce_op in ("mean", "std") and dtype_out == np.int16: - # mean and std need float dtype as output - dtype_out = np.float64 - res = getattr(expr, reduce_op)(axis=axis, keepdims=keepdims, dtype=dtype_out, **kwargs) - nres = getattr(nres, reduce_op)(axis=axis, keepdims=keepdims, dtype=dtype_out) - else: - res = getattr(expr, reduce_op)(axis=axis, keepdims=keepdims, **kwargs) - nres = getattr(nres, reduce_op)(axis=axis, keepdims=keepdims) + + res = getattr(expr, reduce_op)(**reduce_args, **kwargs) + nres = getattr(nres, reduce_op)(**reduce_args) tol = 1e-15 if a1.dtype == "float64" else 1e-6 if kwargs != {}: if not np.isscalar(res): @@ -152,7 +187,8 @@ def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwar # TODO: "prod" is not supported here because it overflows with current values @pytest.mark.parametrize( - "reduce_op", ["sum", "min", "max", "mean", "std", "var", "any", "all", "argmax", "argmin"] + "reduce_op", + ["sum", "min", "max", "mean", "std", "var", "any", "all", "argmax", "argmin", "cumulative_sum"], ) @pytest.mark.parametrize("axis", [0, 1, None]) def test_reduce_expr_arr(array_fixture, axis, reduce_op): @@ -161,15 +197,33 @@ def test_reduce_expr_arr(array_fixture, axis, reduce_op): return expr = a1 + a2 - a3 * a4 nres = eval("na1 + na2 - na3 * na4") + if reduce_op == "cumulative_sum": + axis = 0 if axis is None else axis + nres = npcumsum(nres, axis=axis) + npcumsum(na1, axis=axis) + else: + nres = getattr(nres, reduce_op)(axis=axis) + getattr(na1, reduce_op)(axis=axis) res = getattr(expr, reduce_op)(axis=axis) + getattr(a1, reduce_op)(axis=axis) - nres = getattr(nres, reduce_op)(axis=axis) + getattr(na1, reduce_op)(axis=axis) tol = 1e-15 if a1.dtype == "float64" else 1e-6 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test broadcasting @pytest.mark.parametrize( - "reduce_op", ["sum", "mean", "std", "var", "min", "max", "any", "all", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "mean", + "std", + "var", + "min", + "max", + "any", + "all", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) @pytest.mark.parametrize("keepdims", [True, False]) @@ -182,8 +236,14 @@ def test_reduce_expr_arr(array_fixture, axis, reduce_op): ], ) def test_broadcast_params(axis, keepdims, reduce_op, shapes): - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if reduce_op[:3] == "cum" else axis + reduce_args = {"axis": axis} + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + reduce_args["include_initial"] = keepdims + else: + reduce_args["keepdims"] = keepdims na1 = np.linspace(0, 1, np.prod(shapes[0])).reshape(shapes[0]) na2 = np.linspace(1, 2, np.prod(shapes[1])).reshape(shapes[1]) na3 = np.linspace(2, 3, np.prod(shapes[2])).reshape(shapes[2]) @@ -192,12 +252,17 @@ def test_broadcast_params(axis, keepdims, reduce_op, shapes): a3 = blosc2.asarray(na3) expr1 = a1 + a2 - a3 assert expr1.shape == shapes[0] - expr2 = a1 * a2 + 1 - assert expr2.shape == shapes[0] - res = expr1 - getattr(expr2, reduce_op)(axis=axis, keepdims=keepdims) - assert res.shape == shapes[0] + expr2 = a2 * a3 + 1 + assert expr2.shape == shapes[1] # print(f"res: {res.shape} expr1: {expr1.shape} expr2: {expr2.shape}") - nres = eval(f"na1 + na2 - na3 - (na1 * na2 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})") + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + res = expr2 - getattr(expr1, reduce_op)(**reduce_args) + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + expr = f"na2 * na3 + 1 - {oploc}(na1 + na2 - na3, axis={axis}, include_initial={keepdims})" + else: + res = expr1 - getattr(expr2, reduce_op)(**reduce_args) + expr = f"na1 + na2 - na3 - (na2 * na3 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})" + nres = eval(expr) tol = 1e-14 if a1.dtype == "float64" else 1e-5 np.testing.assert_allclose(res[:], nres, atol=tol, rtol=tol) @@ -205,7 +270,22 @@ def test_broadcast_params(axis, keepdims, reduce_op, shapes): # Test reductions with item parameter @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) @pytest.mark.parametrize("stripes", ["rows", "columns"]) @@ -223,7 +303,7 @@ def test_reduce_item(reduce_op, dtype, stripes, stripe_len, shape, chunks): else: _slice = (slice(None), slice(i, i + stripe_len)) slice_ = na[_slice] - if slice_.size == 0 and reduce_op not in ("sum", "prod"): + if slice_.size == 0 and reduce_op not in ("sum", "prod", "cumulative_sum", "cumulative_prod"): # For mean, std, and var, numpy just raises a warning, so don't check if reduce_op in ("min", "max", "argmin", "argmax"): # Check that a ValueError is raised when the slice is empty @@ -238,7 +318,22 @@ def test_reduce_item(reduce_op, dtype, stripes, stripe_len, shape, chunks): @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) def test_reduce_slice(reduce_op): shape = (8, 12, 5) @@ -247,24 +342,30 @@ def test_reduce_slice(reduce_op): tol = 1e-6 if na.dtype == np.float32 else 1e-15 _slice = (slice(1, 2, 1), slice(3, 7, 1)) res = getattr(a, reduce_op)(item=_slice, axis=-1) - nres = getattr(na[_slice], reduce_op)(axis=-1) + if reduce_op == "cumulative_sum": + oploc = "npcumsum" + elif reduce_op == "cumulative_prod": + oploc = "npcumprod" + else: + oploc = f"np.{reduce_op}" + nres = eval(f"{oploc}(na[_slice], axis=-1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test reductions with slices and strides _slice = (slice(1, 2, 1), slice(1, 9, 2)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test reductions with ints _slice = (0, slice(1, 9, 1)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) _slice = (0, slice(1, 9, 2)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) @@ -272,19 +373,34 @@ def test_reduce_slice(reduce_op): @pytest.mark.parametrize( ("chunks", "blocks"), [ + ((10, 50, 70), (10, 25, 50)), ((20, 50, 100), (10, 50, 100)), - ((10, 25, 70), (10, 25, 50)), ((10, 50, 100), (6, 25, 75)), ((15, 30, 75), (7, 20, 50)), - ((20, 50, 100), (10, 50, 60)), + ((1, 50, 100), (1, 50, 60)), ], ) @pytest.mark.parametrize("disk", [True, False]) -@pytest.mark.parametrize("fill_value", [0, 1, 0.32]) +@pytest.mark.parametrize("fill_value", [1, 0, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) -@pytest.mark.parametrize("axis", [0, 1, None]) +@pytest.mark.parametrize("axis", [None, 0, 1]) def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): shape = (20, 50, 100) urlpath = "a1.b2nd" if disk else None @@ -295,10 +411,26 @@ def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): if disk: a = blosc2.open(urlpath) na = a[:] - + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 if axis is None else axis + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + else: + nres = getattr(na, reduce_op)(axis=axis) res = getattr(a, reduce_op)(axis=axis) - nres = getattr(na, reduce_op)(axis=axis) + assert np.allclose(res, nres) + # Try with a slice + b = blosc2.arange(0, np.prod(shape), blocks=blocks, chunks=chunks, shape=shape) + nb = b[:] + slice_ = (slice(10, 20),) + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 if axis is None else axis + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}((na + nb)[{slice_}], axis={axis})") + else: + nres = getattr((na + nb)[slice_], reduce_op)(axis=axis) + res = getattr(a + b, reduce_op)(axis=axis, item=slice_) assert np.allclose(res, nres) @@ -340,13 +472,29 @@ def test_miniexpr_slice(chunks, blocks, disk, fill_value, reduce_op): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version1(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -369,7 +517,11 @@ def test_save_version1(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = na + getattr(nb[()], reduce_op)(axis=axis) + 1 + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = na + eval(f"{oploc}(nb, axis={axis})") + 1 + else: + nres = na + getattr(nb, reduce_op)(axis=axis) + 1 assert np.allclose(res[()], nres) if disk: @@ -381,13 +533,29 @@ def test_save_version1(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version2(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -409,7 +577,11 @@ def test_save_version2(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + nb + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + nb + else: + nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) if disk: @@ -421,13 +593,29 @@ def test_save_version2(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version3(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -449,7 +637,11 @@ def test_save_version3(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + nb + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + nb + else: + nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) if disk: @@ -461,12 +653,29 @@ def test_save_version3(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version4(disk, fill_value, reduce_op, axis): - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis + shape = (20, 20, 100) shape = (20, 50, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -487,7 +696,11 @@ def test_save_version4(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + else: + nres = getattr(na, reduce_op)(axis=axis) assert np.allclose(res[()], nres) if disk: