diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 87328f658e..5a3b51d35b 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -2,8 +2,8 @@ from __future__ import annotations -import bisect import functools +from bisect import insort from datetime import datetime, timedelta, timezone from logging import getLogger from typing import TYPE_CHECKING, TypeVar, cast @@ -38,7 +38,7 @@ class SortedSnapshotList(list[T]): def add(self, item: T) -> None: """Add an item to the list maintaining sorted order by `created_at` using binary search.""" - bisect.insort(self, item, key=lambda item: item.created_at) + insort(self, item, key=lambda item: item.created_at) @docs_group('Autoscaling') @@ -261,11 +261,13 @@ def _get_sample(snapshots: list[Snapshot], duration: timedelta | None = None) -> latest_time = snapshots[-1].created_at return [snapshot for snapshot in snapshots if latest_time - snapshot.created_at <= duration] - def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None: + async def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None: """Capture a snapshot of the current CPU usage. This method does not perform CPU usage measurement. Instead, it just reads the data received through the `event_data` parameter, which is expected to be supplied by the event manager. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). Args: event_data: System info data from which CPU usage is read. @@ -280,11 +282,13 @@ def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None: self._prune_snapshots(snapshots, event_data.cpu_info.created_at) self._cpu_snapshots.add(snapshot) - def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: + async def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: """Capture a snapshot of the current memory usage. This method does not perform memory usage measurement. Instead, it just reads the data received through the `event_data` parameter, which is expected to be supplied by the event manager. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). Args: event_data: System info data from which memory usage is read. @@ -339,13 +343,15 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: max_memory_size=max_memory_size, ) - def _snapshot_event_loop(self) -> None: + async def _snapshot_event_loop(self) -> None: """Capture a snapshot of the current event loop usage. This method evaluates the event loop's latency by comparing the expected time between snapshots to the actual time elapsed since the last snapshot. The delay in the snapshot reflects the time deviation due to event loop overhead - it's calculated by subtracting the expected interval between snapshots from the actual time elapsed since the last snapshot. If there's no previous snapshot, the delay is considered zero. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). """ snapshot = EventLoopSnapshot(max_delay=self._max_event_loop_delay, delay=timedelta(seconds=0)) previous_snapshot = self._event_loop_snapshots[-1] if self._event_loop_snapshots else None @@ -358,11 +364,13 @@ def _snapshot_event_loop(self) -> None: self._prune_snapshots(snapshots, snapshot.created_at) self._event_loop_snapshots.add(snapshot) - def _snapshot_client(self) -> None: + async def _snapshot_client(self) -> None: """Capture a snapshot of the current API state by checking for rate limit errors (HTTP 429). Only errors produced by a 2nd retry of the API call are considered for snapshotting since earlier errors may just be caused by a random spike in the number of requests and do not necessarily signify API overloading. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). """ client = service_locator.get_storage_client() diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index da657fe8ed..d3057176f0 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -1,10 +1,13 @@ from __future__ import annotations import asyncio +import time +from bisect import insort from datetime import datetime, timedelta, timezone from logging import getLogger from math import floor -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast +from unittest import mock from unittest.mock import MagicMock import pytest @@ -219,9 +222,6 @@ async def test_methods_raise_error_when_not_active() -> None: assert snapshotter.active is True -@pytest.mark.skip( - reason='Flaky due to snapshot pruning boundary condition, see https://github.com/apify/crawlee-python/issues/1734' -) async def test_snapshot_pruning_removes_outdated_records( snapshotter: Snapshotter, event_manager: LocalEventManager, default_memory_info: MemoryInfo ) -> None: @@ -231,17 +231,23 @@ async def test_snapshot_pruning_removes_outdated_records( # Create timestamps for testing now = datetime.now(timezone.utc) - events_data = [ - EventSystemInfoData( - cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)), - memory_info=default_memory_info, - ) - for delta in [5, 3, 2, 0] - ] + def randomly_delayed_insort(*args: Any, **kwargs: Any) -> None: + """Sort with injected delay to provoke otherwise hard to reproduce race condition.""" + time.sleep(0.05) + return insort(*args, **kwargs) - for event_data in events_data: - event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) - await event_manager.wait_for_all_listeners_to_complete() + with mock.patch('crawlee._autoscaling.snapshotter.insort', side_effect=randomly_delayed_insort): + events_data = [ + EventSystemInfoData( + cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)), + memory_info=default_memory_info, + ) + for delta in [3, 2, 5, 0] # Out of order timestamps. Snapshotter can not rely on natural ordering. + ] + + for event_data in events_data: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() cpu_snapshots = cast('list[CpuSnapshot]', snapshotter.get_cpu_sample())