Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

import bisect
from bisect import insort
from datetime import datetime, timedelta, timezone
from logging import getLogger
from typing import TYPE_CHECKING, TypeVar, cast
Expand Down Expand Up @@ -31,7 +31,7 @@ class SortedSnapshotList(list[T]):

def add(self, item: T) -> None:
"""Add an item to the list maintaining sorted order by `created_at` using binary search."""
bisect.insort(self, item, key=lambda item: item.created_at)
insort(self, item, key=lambda item: item.created_at)


@docs_group('Autoscaling')
Expand Down Expand Up @@ -252,11 +252,13 @@ def _get_sample(snapshots: list[Snapshot], duration: timedelta | None = None) ->
latest_time = snapshots[-1].created_at
return [snapshot for snapshot in snapshots if latest_time - snapshot.created_at <= duration]

def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
async def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
"""Capture a snapshot of the current CPU usage.

This method does not perform CPU usage measurement. Instead, it just reads the data received through
the `event_data` parameter, which is expected to be supplied by the event manager.
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
race conditions in snapshots manipulation(sorting and pruning).

Args:
event_data: System info data from which CPU usage is read.
Expand All @@ -271,11 +273,13 @@ def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None:
self._prune_snapshots(snapshots, event_data.cpu_info.created_at)
self._cpu_snapshots.add(snapshot)

def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
async def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
"""Capture a snapshot of the current memory usage.

This method does not perform memory usage measurement. Instead, it just reads the data received through
the `event_data` parameter, which is expected to be supplied by the event manager.
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
race conditions in snapshots manipulation(sorting and pruning).

Args:
event_data: System info data from which memory usage is read.
Expand All @@ -298,13 +302,15 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
self._memory_snapshots.add(snapshot)
self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at)

def _snapshot_event_loop(self) -> None:
async def _snapshot_event_loop(self) -> None:
"""Capture a snapshot of the current event loop usage.

This method evaluates the event loop's latency by comparing the expected time between snapshots to the actual
time elapsed since the last snapshot. The delay in the snapshot reflects the time deviation due to event loop
overhead - it's calculated by subtracting the expected interval between snapshots from the actual time elapsed
since the last snapshot. If there's no previous snapshot, the delay is considered zero.
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
race conditions in snapshots manipulation(sorting and pruning).
"""
snapshot = EventLoopSnapshot(max_delay=self._max_event_loop_delay, delay=timedelta(seconds=0))
previous_snapshot = self._event_loop_snapshots[-1] if self._event_loop_snapshots else None
Expand All @@ -317,11 +323,13 @@ def _snapshot_event_loop(self) -> None:
self._prune_snapshots(snapshots, snapshot.created_at)
self._event_loop_snapshots.add(snapshot)

def _snapshot_client(self) -> None:
async def _snapshot_client(self) -> None:
"""Capture a snapshot of the current API state by checking for rate limit errors (HTTP 429).

Only errors produced by a 2nd retry of the API call are considered for snapshotting since earlier errors may
just be caused by a random spike in the number of requests and do not necessarily signify API overloading.
Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause
race conditions in snapshots manipulation(sorting and pruning).
"""
client = service_locator.get_storage_client()

Expand Down
36 changes: 21 additions & 15 deletions tests/unit/_autoscaling/test_snapshotter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import asyncio
import time
from bisect import insort
from datetime import datetime, timedelta, timezone
from logging import getLogger
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast
from unittest import mock
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -213,9 +216,6 @@ async def test_methods_raise_error_when_not_active() -> None:
assert snapshotter.active is True


@pytest.mark.skip(
reason='Flaky due to snapshot pruning boundary condition, see https://github.com/apify/crawlee-python/issues/1734'
)
async def test_snapshot_pruning_removes_outdated_records(
snapshotter: Snapshotter, event_manager: LocalEventManager, default_memory_info: MemoryInfo
) -> None:
Expand All @@ -225,17 +225,23 @@ async def test_snapshot_pruning_removes_outdated_records(
# Create timestamps for testing
now = datetime.now(timezone.utc)

events_data = [
EventSystemInfoData(
cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)),
memory_info=default_memory_info,
)
for delta in [5, 3, 2, 0]
]

for event_data in events_data:
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data)
await event_manager.wait_for_all_listeners_to_complete()
def randomly_delayed_insort(*args: Any, **kwargs: Any) -> None:
"""Sort with injected delay to provoke otherwise hard to reproduce race condition."""
time.sleep(0.05)
return insort(*args, **kwargs)

with mock.patch('crawlee._autoscaling.snapshotter.insort', side_effect=randomly_delayed_insort):
events_data = [
EventSystemInfoData(
cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)),
memory_info=default_memory_info,
)
for delta in [5, 3, 2, 0]
]

for event_data in events_data:
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data)
await event_manager.wait_for_all_listeners_to_complete()

cpu_snapshots = cast('list[CpuSnapshot]', snapshotter.get_cpu_sample())

Expand Down