Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/crawlee/_autoscaling/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Annotated

from pydantic import Field
from pydantic.dataclasses import dataclass as pydantic_dataclass

if TYPE_CHECKING:
from crawlee._utils.byte_size import ByteSize


SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97


Expand Down Expand Up @@ -167,3 +169,10 @@ def is_overloaded(self) -> bool:


Snapshot = MemorySnapshot | CpuSnapshot | EventLoopSnapshot | ClientSnapshot


@pydantic_dataclass
class Ratio:
"""Represents ratio of memory."""

value: Annotated[float, Field(gt=0.0, le=1.0)]
80 changes: 62 additions & 18 deletions src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
from __future__ import annotations

import bisect
import functools
from datetime import datetime, timedelta, timezone
from logging import getLogger
from typing import TYPE_CHECKING, TypeVar, cast

from crawlee import service_locator
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Snapshot
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Ratio, Snapshot
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.context import ensure_context
from crawlee._utils.docs import docs_group
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.system import MemoryInfo, get_memory_info
from crawlee._utils.system import MemoryInfo, MemoryUsageInfo, get_memory_info
from crawlee.events._types import Event, EventSystemInfoData

if TYPE_CHECKING:
Expand All @@ -26,6 +27,12 @@
T = TypeVar('T', bound=Snapshot)


@functools.lru_cache
def _warn_once(warning_message: str) -> None:
"""Log a warning message only once."""
logger.warning(warning_message)


class SortedSnapshotList(list[T]):
"""A list that maintains sorted order by `created_at` attribute for snapshot objects."""

Expand Down Expand Up @@ -69,7 +76,7 @@ def __init__(
max_used_memory_ratio: float,
max_event_loop_delay: timedelta,
max_client_errors: int,
max_memory_size: ByteSize,
max_memory_size: ByteSize | Ratio,
) -> None:
"""Initialize a new instance.

Expand All @@ -85,7 +92,9 @@ def __init__(
value, the event loop is considered overloaded.
max_client_errors: Sets the maximum number of client errors (HTTP 429). When the number of client errors
is higher than the provided number, the client is considered overloaded.
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`.
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. When of type
`ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory
scaling based on the available system memory.
"""
self._max_used_cpu_ratio = max_used_cpu_ratio
self._max_used_memory_ratio = max_used_memory_ratio
Expand Down Expand Up @@ -121,7 +130,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter:
max_memory_size = (
ByteSize.from_mb(config.memory_mbytes)
if config.memory_mbytes
else ByteSize(int(get_memory_info().total_size.bytes * config.available_memory_ratio))
else Ratio(value=config.available_memory_ratio)
)

return cls(
Expand Down Expand Up @@ -280,23 +289,55 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
Args:
event_data: System info data from which memory usage is read.
"""
match event_data.memory_info, self._max_memory_size:
case MemoryInfo() as memory_info, Ratio() as ratio:
max_memory_size = memory_info.total_size * ratio.value
system_wide_used_size = memory_info.system_wide_used_size
system_wide_memory_size = memory_info.total_size

case MemoryUsageInfo(), Ratio() as ratio:
# This is just hypothetical case, that will most likely not happen in practice.
# `LocalEventManager` should always provide `MemoryInfo` in the event data.
# When running on Apify, `self._max_memory_size` is always `ByteSize`, not `Ratio`.
_warn_once(
'It is recommended that a custom implementation of `LocalEventManager` emits `SYSTEM_INFO` events '
'with `MemoryInfo` and not just `MemoryUsageInfo`.'
)
max_memory_size = get_memory_info().total_size * ratio.value
system_wide_used_size = None
system_wide_memory_size = None

case MemoryInfo() as memory_info, ByteSize() as byte_size:
max_memory_size = byte_size
system_wide_used_size = memory_info.system_wide_used_size
system_wide_memory_size = memory_info.total_size

case MemoryUsageInfo(), ByteSize() as byte_size:
max_memory_size = byte_size
system_wide_used_size = None
system_wide_memory_size = None

case _, _:
raise NotImplementedError('Unsupported combination of memory info and max memory size types.')

snapshot = MemorySnapshot(
current_size=event_data.memory_info.current_size,
max_memory_size=self._max_memory_size,
max_memory_size=max_memory_size,
max_used_memory_ratio=self._max_used_memory_ratio,
created_at=event_data.memory_info.created_at,
system_wide_used_size=None,
system_wide_memory_size=None,
system_wide_used_size=system_wide_used_size,
system_wide_memory_size=system_wide_memory_size,
)

if isinstance(memory_info := event_data.memory_info, MemoryInfo):
snapshot.system_wide_used_size = memory_info.system_wide_used_size
snapshot.system_wide_memory_size = memory_info.total_size

snapshots = cast('list[Snapshot]', self._memory_snapshots)
self._prune_snapshots(snapshots, snapshot.created_at)
self._memory_snapshots.add(snapshot)
self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at)

self._evaluate_memory_load(
event_data.memory_info.current_size,
event_data.memory_info.created_at,
max_memory_size=max_memory_size,
)

def _snapshot_event_loop(self) -> None:
"""Capture a snapshot of the current event loop usage.
Expand Down Expand Up @@ -364,27 +405,30 @@ def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None:
else:
snapshots.clear()

def _evaluate_memory_load(self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime) -> None:
def _evaluate_memory_load(
self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime, max_memory_size: ByteSize
) -> None:
"""Evaluate and logs critical memory load conditions based on the system information.

Args:
current_memory_usage_size: The current memory usage.
snapshot_timestamp: The time at which the memory snapshot was taken.
max_memory_size: The maximum memory size to be used for evaluation.
"""
# Check if the warning has been logged recently to avoid spamming
if snapshot_timestamp < self._timestamp_of_last_memory_warning + self._MEMORY_WARNING_COOLDOWN_PERIOD:
return

threshold_memory_size = self._max_used_memory_ratio * self._max_memory_size
buffer_memory_size = self._max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO
threshold_memory_size = self._max_used_memory_ratio * max_memory_size
buffer_memory_size = max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO
overload_memory_threshold_size = threshold_memory_size + buffer_memory_size

# Log a warning if current memory usage exceeds the critical overload threshold
if current_memory_usage_size > overload_memory_threshold_size:
memory_usage_percentage = round((current_memory_usage_size.bytes / self._max_memory_size.bytes) * 100)
memory_usage_percentage = round((current_memory_usage_size.bytes / max_memory_size.bytes) * 100)
logger.warning(
f'Memory is critically overloaded. Using {current_memory_usage_size} of '
f'{self._max_memory_size} ({memory_usage_percentage}%). '
f'{max_memory_size} ({memory_usage_percentage}%). '
'Consider increasing available memory.'
)
self._timestamp_of_last_memory_warning = snapshot_timestamp
7 changes: 5 additions & 2 deletions src/crawlee/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,14 @@ class Configuration(BaseSettings):
validation_alias=AliasChoices(
'apify_available_memory_ratio',
'crawlee_available_memory_ratio',
)
),
gt=0.0,
le=1.0,
),
] = 0.25
"""The maximum proportion of system memory to use. If `memory_mbytes` is not provided, this ratio is used to
calculate the maximum memory. This option is utilized by the `Snapshotter`."""
calculate the maximum memory. This option is utilized by the `Snapshotter` and supports the dynamic system memory
scaling."""

storage_dir: Annotated[
str,
Expand Down
70 changes: 68 additions & 2 deletions tests/unit/_autoscaling/test_snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@
import asyncio
from datetime import datetime, timedelta, timezone
from logging import getLogger
from math import floor
from typing import TYPE_CHECKING, cast
from unittest.mock import MagicMock

import pytest

from crawlee import service_locator
from crawlee._autoscaling import Snapshotter
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, MemorySnapshot
from crawlee._autoscaling._types import (
SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD,
ClientSnapshot,
CpuSnapshot,
MemorySnapshot,
)
from crawlee._autoscaling.snapshotter import SortedSnapshotList
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.system import CpuInfo, MemoryInfo
from crawlee._utils.system import CpuInfo, MemoryInfo, get_memory_info
from crawlee.configuration import Configuration
from crawlee.events import LocalEventManager
from crawlee.events._types import Event, EventSystemInfoData
Expand Down Expand Up @@ -369,3 +375,63 @@ def test_sorted_snapshot_list_add_maintains_order() -> None:
prev_time = sorted_list[i - 1].created_at
curr_time = snapshot.created_at
assert prev_time <= curr_time, f'Items at indices {i - 1} and {i} are not in chronological order'


@pytest.mark.parametrize('dynamic_memory', [True, False])
async def test_dynamic_memory(
*,
default_cpu_info: CpuInfo,
event_manager: LocalEventManager,
dynamic_memory: bool,
) -> None:
"""Test dynamic memory scaling scenario where the system-wide memory can change.

Create two memory snapshots. They have same memory usage, but different available memory.
First snapshot is created with insufficient memory, so it is overloaded.
Second snapshot is created with sufficient memory.

Based on the Snapshotter configuration, it will either take into account the increased available memory or not.
"""
_initial_memory_info = get_memory_info()
ratio_just_below_system_wide_overload = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD

memory_mbytes = 0 if dynamic_memory else floor(_initial_memory_info.total_size.to_mb())

service_locator.set_event_manager(event_manager)

async with Snapshotter.from_config(
Configuration(memory_mbytes=memory_mbytes, available_memory_ratio=ratio_just_below_system_wide_overload)
) as snapshotter:
# Default state, memory usage exactly at the overload threshold -> overloaded, but not system-wide overloaded
memory_infos = [
# Overloaded sample
MemoryInfo(
total_size=_initial_memory_info.total_size,
current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
),
# Same as first sample, with twice as memory available in the system
MemoryInfo(
total_size=_initial_memory_info.total_size * 2, # Simulate increased total memory
current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload,
),
]

for memory_info in memory_infos:
event_manager.emit(
event=Event.SYSTEM_INFO,
event_data=EventSystemInfoData(
cpu_info=default_cpu_info,
memory_info=memory_info,
),
)

await event_manager.wait_for_all_listeners_to_complete()

memory_samples = snapshotter.get_memory_sample()
assert len(memory_samples) == 2
# First sample will be overloaded.
assert memory_samples[0].is_overloaded
# Second sample can reflect the increased available memory based on the configuration used to create Snapshotter
assert memory_samples[1].is_overloaded == (not dynamic_memory)
Loading