Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 35 additions & 30 deletions albucore/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,13 @@ def apply_lut(
num_channels = img.shape[-1]

luts = clip(create_lut_array(dtype, value, operation), dtype, inplace=False)
return cv2.merge([sz_lut(img[:, :, i], luts[i], inplace) for i in range(num_channels)])

result = np.empty_like(img, dtype=np.float32)

for i in range(num_channels):
result[..., i] = sz_lut(img[..., i], luts[i], inplace)

return result


def prepare_value_opencv(
Expand Down Expand Up @@ -309,15 +315,9 @@ def normalize_lut(img: np.ndarray, mean: float | np.ndarray, denominator: float
lut = ((np.arange(0, max_value + 1, dtype=np.float32) - mean) * denominator).astype(np.float32)
return cv2.LUT(img, lut)

# Convert to float32 if needed
if isinstance(mean, np.ndarray):
mean = mean.astype(np.float32, copy=False)
if isinstance(denominator, np.ndarray):
denominator = denominator.astype(np.float32, copy=False)

# Vectorized LUT creation - shape: (256, num_channels)
arange_vals = np.arange(0, max_value + 1, dtype=np.float32)
luts = (arange_vals[:, np.newaxis] - mean) * denominator
luts = ((arange_vals[:, np.newaxis] - mean) * denominator).astype(np.float32)

# Pre-allocate result array
result = np.empty_like(img, dtype=np.float32)
Expand Down Expand Up @@ -363,7 +363,6 @@ def power_opencv(img: np.ndarray, value: float) -> np.ndarray:
raise ValueError(f"Unsupported image type {img.dtype} for power operation with value {value}")


# @preserve_channel_dim
def power_lut(img: np.ndarray, exponent: float | np.ndarray, inplace: bool = False) -> np.ndarray:
return apply_lut(img, exponent, "power", inplace)

Expand Down Expand Up @@ -446,11 +445,11 @@ def multiply_add_opencv(img: np.ndarray, factor: ValueType, value: ValueType) ->

result = img.astype(np.float32, copy=False)
result = (
cv2.multiply(result, np.ones_like(result) * factor, dtype=cv2.CV_64F)
cv2.multiply(result, np.ones_like(result) * factor, dtype=cv2.CV_32F)
if factor != 0
else np.zeros_like(result, dtype=img.dtype)
)
return result if value == 0 else cv2.add(result, np.ones_like(result) * value, dtype=cv2.CV_64F)
return result if value == 0 else cv2.add(result, np.ones_like(result) * value, dtype=cv2.CV_32F)


def multiply_add_lut(img: np.ndarray, factor: ValueType, value: ValueType, inplace: bool) -> np.ndarray:
Expand All @@ -468,9 +467,13 @@ def multiply_add_lut(img: np.ndarray, factor: ValueType, value: ValueType, inpla
if isinstance(value, np.ndarray) and value.shape != ():
value = value.reshape(-1, 1)

luts = clip(np.arange(0, max_value + 1, dtype=np.float32) * factor + value, dtype, inplace=True)
luts = clip(np.arange(0, max_value + 1, dtype=np.float32) * factor + value, dtype, inplace=False)

result = np.empty_like(img, dtype=np.float32)
for i in range(num_channels):
result[..., i] = sz_lut(img[..., i], luts[i], inplace)

return cv2.merge([sz_lut(img[:, :, i], luts[i], inplace) for i in range(num_channels)])
return result


@clipped
Expand Down Expand Up @@ -512,8 +515,9 @@ def _compute_per_channel_stats_opencv(img: np.ndarray) -> tuple[np.ndarray, np.n
return mean, std


def _normalize_mean_std_opencv(img_f: np.ndarray, mean: float | np.ndarray, std: float | np.ndarray) -> np.ndarray:
def _normalize_mean_std_opencv(img: np.ndarray, mean: float | np.ndarray, std: float | np.ndarray) -> np.ndarray:
"""Apply mean-std normalization using OpenCV or NumPy based on dimensionality."""
img_f = img.astype(np.float32, copy=False)
if img_f.ndim > 3:
# Use NumPy operations for 4D/5D (faster)
normalized_img = (img_f - mean) / std
Expand Down Expand Up @@ -586,13 +590,11 @@ def normalize_per_image_opencv(

if normalization == "image":
mean, std = _compute_image_stats_opencv(img)
img_f = img.astype(np.float32, copy=False)
return _normalize_mean_std_opencv(img_f, mean, std)
return _normalize_mean_std_opencv(img, mean, std)

if normalization == "image_per_channel":
mean, std = _compute_per_channel_stats_opencv(img)
img_f = img.astype(np.float32, copy=False)
return _normalize_mean_std_opencv(img_f, mean, std)
return _normalize_mean_std_opencv(img, mean, std)

if normalization == "min_max":
return cv2.normalize(img, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
Expand Down Expand Up @@ -712,29 +714,33 @@ def normalize_per_image_lut(
mean = img.mean()
std = img.std() + eps

lut = ((np.arange(0, max_value + 1, dtype=np.float32) - mean) / std).astype(np.float32)
return cv2.LUT(img, lut).clip(-20, 20)
lut = ((np.arange(0, max_value + 1, dtype=np.float32) - mean) / std).clip(-20, 20).astype(np.float32)
return cv2.LUT(img, lut)

if normalization == "image_per_channel":
axes = tuple(range(img.ndim - 1)) # All axes except channel
pixel_mean = img.mean(axis=axes).astype(np.float32)
pixel_std = img.std(axis=axes).astype(np.float32) + np.float32(eps)
pixel_mean = img.mean(axis=axes)
pixel_std = img.std(axis=axes) + eps

# Create all LUTs at once using vectorized operations
arange_vals = np.arange(0, max_value + 1, dtype=np.float32)
# LUTs shape will be (256, num_channels)
luts = (arange_vals[:, np.newaxis] - pixel_mean) / pixel_std
luts = ((arange_vals[:, np.newaxis] - pixel_mean) / pixel_std).clip(-20, 20).astype(np.float32)

result = np.empty_like(img, dtype=np.float32)
for i in range(num_channels):
result[..., i] = cv2.LUT(img[..., i], luts[:, i])
return result.clip(-20, 20)
return result

if normalization == "min_max" or (img.shape[-1] == 1 and normalization == "min_max_per_channel"):
img_min = img.min()
img_max = img.max()
lut = ((np.arange(0, max_value + 1, dtype=np.float32) - img_min) / (img_max - img_min + eps)).astype(np.float32)
return cv2.LUT(img, lut).clip(-20, 20)
lut = (
((np.arange(0, max_value + 1, dtype=np.float32) - img_min) / (img_max - img_min + eps))
.clip(-20, 20)
.astype(np.float32)
)
return cv2.LUT(img, lut)

if normalization == "min_max_per_channel":
axes = tuple(range(img.ndim - 1)) # All axes except channel
Expand All @@ -744,12 +750,12 @@ def normalize_per_image_lut(
# Create all LUTs at once using vectorized operations
arange_vals = np.arange(0, max_value + 1, dtype=np.float32)
# LUTs shape will be (256, num_channels)
luts = ((arange_vals[:, np.newaxis] - img_min) / (img_max - img_min + eps)).astype(np.float32)
luts = ((arange_vals[:, np.newaxis] - img_min) / (img_max - img_min + eps)).clip(-20, 20).astype(np.float32)

result = np.empty_like(img, dtype=np.float32)
for i in range(num_channels):
result[..., i] = cv2.LUT(img[..., i], luts[:, i])
return result.clip(-20, 20)
return result

raise ValueError(f"Unknown normalization method: {normalization}")

Expand Down Expand Up @@ -829,7 +835,7 @@ def to_float_lut(img: np.ndarray, max_value: float | None = None) -> np.ndarray:

if max_value is None:
max_value = MAX_VALUES_BY_DTYPE[img.dtype]
lut = np.arange(256, dtype=np.float32) / max_value
lut = (np.arange(256, dtype=np.float32) / max_value).astype(np.float32)
return cv2.LUT(img, lut)


Expand Down Expand Up @@ -949,7 +955,6 @@ def _flip_multichannel(img: np.ndarray, flip_code: int) -> np.ndarray:
Flipped image with all channels preserved
"""
# Get image dimensions
height, width = img.shape[:2]
num_channels = img.shape[2]

# If the image has fewer than 512 channels, use cv2.flip directly
Expand Down