diff --git a/src/crawlee/_autoscaling/_types.py b/src/crawlee/_autoscaling/_types.py index b231c9062d..f321214313 100644 --- a/src/crawlee/_autoscaling/_types.py +++ b/src/crawlee/_autoscaling/_types.py @@ -2,12 +2,14 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Annotated + +from pydantic import Field +from pydantic.dataclasses import dataclass as pydantic_dataclass if TYPE_CHECKING: from crawlee._utils.byte_size import ByteSize - SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97 @@ -167,3 +169,10 @@ def is_overloaded(self) -> bool: Snapshot = MemorySnapshot | CpuSnapshot | EventLoopSnapshot | ClientSnapshot + + +@pydantic_dataclass +class Ratio: + """Represents ratio of memory.""" + + value: Annotated[float, Field(gt=0.0, le=1.0)] diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 55af9da1dd..87328f658e 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -3,17 +3,18 @@ from __future__ import annotations import bisect +import functools from datetime import datetime, timedelta, timezone from logging import getLogger from typing import TYPE_CHECKING, TypeVar, cast from crawlee import service_locator -from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Snapshot +from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Ratio, Snapshot from crawlee._utils.byte_size import ByteSize from crawlee._utils.context import ensure_context from crawlee._utils.docs import docs_group from crawlee._utils.recurring_task import RecurringTask -from crawlee._utils.system import MemoryInfo, get_memory_info +from crawlee._utils.system import MemoryInfo, MemoryUsageInfo, get_memory_info from crawlee.events._types import Event, EventSystemInfoData if TYPE_CHECKING: @@ -26,6 +27,12 @@ T = TypeVar('T', bound=Snapshot) +@functools.lru_cache +def _warn_once(warning_message: str) -> None: + """Log a warning message only once.""" + logger.warning(warning_message) + + class SortedSnapshotList(list[T]): """A list that maintains sorted order by `created_at` attribute for snapshot objects.""" @@ -69,7 +76,7 @@ def __init__( max_used_memory_ratio: float, max_event_loop_delay: timedelta, max_client_errors: int, - max_memory_size: ByteSize, + max_memory_size: ByteSize | Ratio, ) -> None: """Initialize a new instance. @@ -85,7 +92,9 @@ def __init__( value, the event loop is considered overloaded. max_client_errors: Sets the maximum number of client errors (HTTP 429). When the number of client errors is higher than the provided number, the client is considered overloaded. - max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. + max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. When of type + `ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory + scaling based on the available system memory. """ self._max_used_cpu_ratio = max_used_cpu_ratio self._max_used_memory_ratio = max_used_memory_ratio @@ -121,7 +130,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter: max_memory_size = ( ByteSize.from_mb(config.memory_mbytes) if config.memory_mbytes - else ByteSize(int(get_memory_info().total_size.bytes * config.available_memory_ratio)) + else Ratio(value=config.available_memory_ratio) ) return cls( @@ -280,23 +289,55 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: Args: event_data: System info data from which memory usage is read. """ + match event_data.memory_info, self._max_memory_size: + case MemoryInfo() as memory_info, Ratio() as ratio: + max_memory_size = memory_info.total_size * ratio.value + system_wide_used_size = memory_info.system_wide_used_size + system_wide_memory_size = memory_info.total_size + + case MemoryUsageInfo(), Ratio() as ratio: + # This is just hypothetical case, that will most likely not happen in practice. + # `LocalEventManager` should always provide `MemoryInfo` in the event data. + # When running on Apify, `self._max_memory_size` is always `ByteSize`, not `Ratio`. + _warn_once( + 'It is recommended that a custom implementation of `LocalEventManager` emits `SYSTEM_INFO` events ' + 'with `MemoryInfo` and not just `MemoryUsageInfo`.' + ) + max_memory_size = get_memory_info().total_size * ratio.value + system_wide_used_size = None + system_wide_memory_size = None + + case MemoryInfo() as memory_info, ByteSize() as byte_size: + max_memory_size = byte_size + system_wide_used_size = memory_info.system_wide_used_size + system_wide_memory_size = memory_info.total_size + + case MemoryUsageInfo(), ByteSize() as byte_size: + max_memory_size = byte_size + system_wide_used_size = None + system_wide_memory_size = None + + case _, _: + raise NotImplementedError('Unsupported combination of memory info and max memory size types.') + snapshot = MemorySnapshot( current_size=event_data.memory_info.current_size, - max_memory_size=self._max_memory_size, + max_memory_size=max_memory_size, max_used_memory_ratio=self._max_used_memory_ratio, created_at=event_data.memory_info.created_at, - system_wide_used_size=None, - system_wide_memory_size=None, + system_wide_used_size=system_wide_used_size, + system_wide_memory_size=system_wide_memory_size, ) - if isinstance(memory_info := event_data.memory_info, MemoryInfo): - snapshot.system_wide_used_size = memory_info.system_wide_used_size - snapshot.system_wide_memory_size = memory_info.total_size - snapshots = cast('list[Snapshot]', self._memory_snapshots) self._prune_snapshots(snapshots, snapshot.created_at) self._memory_snapshots.add(snapshot) - self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at) + + self._evaluate_memory_load( + event_data.memory_info.current_size, + event_data.memory_info.created_at, + max_memory_size=max_memory_size, + ) def _snapshot_event_loop(self) -> None: """Capture a snapshot of the current event loop usage. @@ -364,27 +405,30 @@ def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None: else: snapshots.clear() - def _evaluate_memory_load(self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime) -> None: + def _evaluate_memory_load( + self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime, max_memory_size: ByteSize + ) -> None: """Evaluate and logs critical memory load conditions based on the system information. Args: current_memory_usage_size: The current memory usage. snapshot_timestamp: The time at which the memory snapshot was taken. + max_memory_size: The maximum memory size to be used for evaluation. """ # Check if the warning has been logged recently to avoid spamming if snapshot_timestamp < self._timestamp_of_last_memory_warning + self._MEMORY_WARNING_COOLDOWN_PERIOD: return - threshold_memory_size = self._max_used_memory_ratio * self._max_memory_size - buffer_memory_size = self._max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO + threshold_memory_size = self._max_used_memory_ratio * max_memory_size + buffer_memory_size = max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO overload_memory_threshold_size = threshold_memory_size + buffer_memory_size # Log a warning if current memory usage exceeds the critical overload threshold if current_memory_usage_size > overload_memory_threshold_size: - memory_usage_percentage = round((current_memory_usage_size.bytes / self._max_memory_size.bytes) * 100) + memory_usage_percentage = round((current_memory_usage_size.bytes / max_memory_size.bytes) * 100) logger.warning( f'Memory is critically overloaded. Using {current_memory_usage_size} of ' - f'{self._max_memory_size} ({memory_usage_percentage}%). ' + f'{max_memory_size} ({memory_usage_percentage}%). ' 'Consider increasing available memory.' ) self._timestamp_of_last_memory_warning = snapshot_timestamp diff --git a/src/crawlee/configuration.py b/src/crawlee/configuration.py index d6dc6b071e..85d4b8c560 100644 --- a/src/crawlee/configuration.py +++ b/src/crawlee/configuration.py @@ -177,11 +177,14 @@ class Configuration(BaseSettings): validation_alias=AliasChoices( 'apify_available_memory_ratio', 'crawlee_available_memory_ratio', - ) + ), + gt=0.0, + le=1.0, ), ] = 0.25 """The maximum proportion of system memory to use. If `memory_mbytes` is not provided, this ratio is used to - calculate the maximum memory. This option is utilized by the `Snapshotter`.""" + calculate the maximum memory. This option is utilized by the `Snapshotter` and supports the dynamic system memory + scaling.""" storage_dir: Annotated[ str, diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index a07656fa19..a4682fce43 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -3,6 +3,7 @@ import asyncio from datetime import datetime, timedelta, timezone from logging import getLogger +from math import floor from typing import TYPE_CHECKING, cast from unittest.mock import MagicMock @@ -10,10 +11,15 @@ from crawlee import service_locator from crawlee._autoscaling import Snapshotter -from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, MemorySnapshot +from crawlee._autoscaling._types import ( + SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD, + ClientSnapshot, + CpuSnapshot, + MemorySnapshot, +) from crawlee._autoscaling.snapshotter import SortedSnapshotList from crawlee._utils.byte_size import ByteSize -from crawlee._utils.system import CpuInfo, MemoryInfo +from crawlee._utils.system import CpuInfo, MemoryInfo, get_memory_info from crawlee.configuration import Configuration from crawlee.events import LocalEventManager from crawlee.events._types import Event, EventSystemInfoData @@ -369,3 +375,63 @@ def test_sorted_snapshot_list_add_maintains_order() -> None: prev_time = sorted_list[i - 1].created_at curr_time = snapshot.created_at assert prev_time <= curr_time, f'Items at indices {i - 1} and {i} are not in chronological order' + + +@pytest.mark.parametrize('dynamic_memory', [True, False]) +async def test_dynamic_memory( + *, + default_cpu_info: CpuInfo, + event_manager: LocalEventManager, + dynamic_memory: bool, +) -> None: + """Test dynamic memory scaling scenario where the system-wide memory can change. + + Create two memory snapshots. They have same memory usage, but different available memory. + First snapshot is created with insufficient memory, so it is overloaded. + Second snapshot is created with sufficient memory. + + Based on the Snapshotter configuration, it will either take into account the increased available memory or not. + """ + _initial_memory_info = get_memory_info() + ratio_just_below_system_wide_overload = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD + + memory_mbytes = 0 if dynamic_memory else floor(_initial_memory_info.total_size.to_mb()) + + service_locator.set_event_manager(event_manager) + + async with Snapshotter.from_config( + Configuration(memory_mbytes=memory_mbytes, available_memory_ratio=ratio_just_below_system_wide_overload) + ) as snapshotter: + # Default state, memory usage exactly at the overload threshold -> overloaded, but not system-wide overloaded + memory_infos = [ + # Overloaded sample + MemoryInfo( + total_size=_initial_memory_info.total_size, + current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, + system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, + ), + # Same as first sample, with twice as memory available in the system + MemoryInfo( + total_size=_initial_memory_info.total_size * 2, # Simulate increased total memory + current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, + system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, + ), + ] + + for memory_info in memory_infos: + event_manager.emit( + event=Event.SYSTEM_INFO, + event_data=EventSystemInfoData( + cpu_info=default_cpu_info, + memory_info=memory_info, + ), + ) + + await event_manager.wait_for_all_listeners_to_complete() + + memory_samples = snapshotter.get_memory_sample() + assert len(memory_samples) == 2 + # First sample will be overloaded. + assert memory_samples[0].is_overloaded + # Second sample can reflect the increased available memory based on the configuration used to create Snapshotter + assert memory_samples[1].is_overloaded == (not dynamic_memory)