diff --git a/src/apify_client/clients/base/resource_collection_client.py b/src/apify_client/clients/base/resource_collection_client.py index 2e9c6063..dc4b5e78 100644 --- a/src/apify_client/clients/base/resource_collection_client.py +++ b/src/apify_client/clients/base/resource_collection_client.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import Any, Generic, TypeVar +from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Generator, Iterable, Iterator +from typing import Any, Generic, Protocol, TypeVar from apify_client._utils import parse_date_fields, pluck_data from apify_client.clients.base.base_client import BaseClient, BaseClientAsync @@ -51,6 +52,36 @@ def _list(self, **kwargs: Any) -> ListPage: return ListPage(parse_date_fields(pluck_data(response.json()))) + def _list_iterable(self, **kwargs: Any) -> IterableListPage[T]: + """Return object can be awaited or iterated over.""" + chunk_size = kwargs.pop('chunk_size', None) + + list_page = self._list(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)}) + + def iterator() -> Iterator[T]: + current_page = list_page + for item in current_page.items: + yield item + + offset = kwargs.get('offset') or 0 + limit = min(kwargs.get('limit') or current_page.total, current_page.total) + + current_offset = offset + len(current_page.items) + remaining_items = min(current_page.total - offset, limit) - len(current_page.items) + while current_page.items and remaining_items > 0: + new_kwargs = { + **kwargs, + 'offset': current_offset, + 'limit': _min_for_limit_param(remaining_items, chunk_size), + } + current_page = self._list(**new_kwargs) + for item in current_page.items: + yield item + current_offset += len(current_page.items) + remaining_items -= len(current_page.items) + + return IterableListPage[T](list_page, iterator()) + def _create(self, resource: dict) -> dict: response = self.http_client.call( url=self._url(), @@ -84,6 +115,36 @@ async def _list(self, **kwargs: Any) -> ListPage: return ListPage(parse_date_fields(pluck_data(response.json()))) + def _list_iterable(self, **kwargs: Any) -> ListPageProtocolAsync[T]: + """Return object can be awaited or iterated over.""" + chunk_size = kwargs.pop('chunk_size', None) + + list_page_awaitable = self._list(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)}) + + async def async_iterator() -> AsyncIterator[T]: + current_page = await list_page_awaitable + for item in current_page.items: + yield item + + offset = kwargs.get('offset') or 0 + limit = min(kwargs.get('limit') or current_page.total, current_page.total) + + current_offset = offset + len(current_page.items) + remaining_items = min(current_page.total - offset, limit) - len(current_page.items) + while current_page.items and remaining_items > 0: + new_kwargs = { + **kwargs, + 'offset': current_offset, + 'limit': _min_for_limit_param(remaining_items, chunk_size), + } + current_page = await self._list(**new_kwargs) + for item in current_page.items: + yield item + current_offset += len(current_page.items) + remaining_items -= len(current_page.items) + + return IterableListPageAsync[T](list_page_awaitable, async_iterator()) + async def _create(self, resource: dict) -> dict: response = await self.http_client.call( url=self._url(), @@ -107,3 +168,76 @@ async def _get_or_create( ) return parse_date_fields(pluck_data(response.json())) + + +class ListPageProtocol(Protocol[T], Iterable[T]): + """Protocol for an object that can be both awaited and asynchronously iterated over.""" + + items: list[T] + """List of returned objects on this page""" + + count: int + """Count of the returned objects on this page""" + + offset: int + """The limit on the number of returned objects offset specified in the API call""" + + limit: int + """The offset of the first object specified in the API call""" + + total: int + """Total number of objects matching the API call criteria""" + + desc: bool + """Whether the listing is descending or not""" + + +class IterableListPage(ListPage[T], Generic[T]): + """Can be called to get ListPage with items or iterated over to get individual items.""" + + def __init__(self, list_page: ListPage[T], iterator: Iterator[T]) -> None: + self.items = list_page.items + self.offset = list_page.offset + self.limit = list_page.limit + self.count = list_page.count + self.total = list_page.total + self.desc = list_page.desc + self._iterator = iterator + + def __iter__(self) -> Iterator[T]: + """Return an iterator over the items from API, possibly doing multiple API calls.""" + return self._iterator + + +class ListPageProtocolAsync(Protocol[T], AsyncIterable[T], Awaitable[ListPage[T]]): + """Protocol for an object that can be both awaited and asynchronously iterated over.""" + + +class IterableListPageAsync(Generic[T]): + """Can be awaited to get ListPage with items or asynchronously iterated over to get individual items.""" + + def __init__(self, awaitable: Awaitable[ListPage[T]], async_iterator: AsyncIterator[T]) -> None: + self._awaitable = awaitable + self._async_iterator = async_iterator + + def __aiter__(self) -> AsyncIterator[T]: + """Return an asynchronous iterator over the items from API, possibly doing multiple API calls.""" + return self._async_iterator + + def __await__(self) -> Generator[Any, Any, ListPage[T]]: + """Return an awaitable that resolves to the ListPage doing exactly one API call.""" + return self._awaitable.__await__() + + +def _min_for_limit_param(a: int | None, b: int | None) -> int | None: + """Return minimum of two limit parameters, treating None or 0 as infinity. Return None for infinity.""" + # API treats 0 as None for limit parameter, in this context API understands 0 as infinity. + if a == 0: + a = None + if b == 0: + b = None + if a is None: + return b + if b is None: + return a + return min(a, b) diff --git a/src/apify_client/clients/resource_clients/actor_collection.py b/src/apify_client/clients/resource_clients/actor_collection.py index 95c7b105..91059a2f 100644 --- a/src/apify_client/clients/resource_clients/actor_collection.py +++ b/src/apify_client/clients/resource_clients/actor_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.actor import get_actor_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class ActorCollectionClient(ResourceCollectionClient): @@ -25,7 +25,7 @@ def list( offset: int | None = None, desc: bool | None = None, sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt', - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the Actors the user has created or used. https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors @@ -40,7 +40,7 @@ def list( Returns: The list of available Actors matching the specified filters. """ - return self._list(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) + return self._list_iterable(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) def create( self, @@ -142,7 +142,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'acts') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, my: bool | None = None, @@ -150,7 +150,7 @@ async def list( offset: int | None = None, desc: bool | None = None, sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt', - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the Actors the user has created or used. https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors @@ -165,7 +165,7 @@ async def list( Returns: The list of available Actors matching the specified filters. """ - return await self._list(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) + return self._list_iterable(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by) async def create( self, diff --git a/src/apify_client/clients/resource_clients/actor_env_var_collection.py b/src/apify_client/clients/resource_clients/actor_env_var_collection.py index 217bdd22..e351d234 100644 --- a/src/apify_client/clients/resource_clients/actor_env_var_collection.py +++ b/src/apify_client/clients/resource_clients/actor_env_var_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.actor_env_var import get_actor_env_var_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class ActorEnvVarCollectionClient(ResourceCollectionClient): @@ -17,7 +17,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'env-vars') super().__init__(*args, resource_path=resource_path, **kwargs) - def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocol[dict]: """List the available actor environment variables. https://docs.apify.com/api/v2#/reference/actors/environment-variable-collection/get-list-of-environment-variables @@ -25,7 +25,7 @@ def list(self) -> ListPage[dict]: Returns: The list of available actor environment variables. """ - return self._list() + return self._list_iterable() def create( self, @@ -62,7 +62,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'env-vars') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocolAsync[dict]: """List the available actor environment variables. https://docs.apify.com/api/v2#/reference/actors/environment-variable-collection/get-list-of-environment-variables @@ -70,7 +70,7 @@ async def list(self) -> ListPage[dict]: Returns: The list of available actor environment variables. """ - return await self._list() + return self._list_iterable() async def create( self, diff --git a/src/apify_client/clients/resource_clients/actor_version_collection.py b/src/apify_client/clients/resource_clients/actor_version_collection.py index 6c3b1b5d..eeaf36c2 100644 --- a/src/apify_client/clients/resource_clients/actor_version_collection.py +++ b/src/apify_client/clients/resource_clients/actor_version_collection.py @@ -9,7 +9,7 @@ if TYPE_CHECKING: from apify_shared.consts import ActorSourceType - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class ActorVersionCollectionClient(ResourceCollectionClient): @@ -19,7 +19,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'versions') super().__init__(*args, resource_path=resource_path, **kwargs) - def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocol[dict]: """List the available Actor versions. https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions @@ -27,7 +27,7 @@ def list(self) -> ListPage[dict]: Returns: The list of available Actor versions. """ - return self._list() + return self._list_iterable() def create( self, @@ -88,7 +88,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'versions') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list(self) -> ListPage[dict]: + def list(self) -> ListPageProtocolAsync[dict]: """List the available Actor versions. https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions @@ -96,7 +96,7 @@ async def list(self) -> ListPage[dict]: Returns: The list of available Actor versions. """ - return await self._list() + return self._list_iterable() async def create( self, diff --git a/src/apify_client/clients/resource_clients/build_collection.py b/src/apify_client/clients/resource_clients/build_collection.py index 4eada958..4321c806 100644 --- a/src/apify_client/clients/resource_clients/build_collection.py +++ b/src/apify_client/clients/resource_clients/build_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class BuildCollectionClient(ResourceCollectionClient): @@ -21,7 +21,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List all Actor builds. List all Actor builds, either of a single Actor, or all user's Actors, depending on where this client @@ -38,7 +38,7 @@ def list( Returns: The retrieved Actor builds. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) class BuildCollectionClientAsync(ResourceCollectionClientAsync): @@ -48,13 +48,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'actor-builds') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List all Actor builds. List all Actor builds, either of a single Actor, or all user's Actors, depending on where this client @@ -71,4 +71,4 @@ async def list( Returns: The retrieved Actor builds. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) diff --git a/src/apify_client/clients/resource_clients/dataset_collection.py b/src/apify_client/clients/resource_clients/dataset_collection.py index 602497ce..14251c86 100644 --- a/src/apify_client/clients/resource_clients/dataset_collection.py +++ b/src/apify_client/clients/resource_clients/dataset_collection.py @@ -6,7 +6,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class DatasetCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available datasets. https://docs.apify.com/api/v2#/reference/datasets/dataset-collection/get-list-of-datasets @@ -37,7 +37,7 @@ def list( Returns: The list of available datasets matching the specified filters. """ - return self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable(unnamed=unnamed, limit=limit, offset=offset, desc=desc) def get_or_create(self, *, name: str | None = None, schema: dict | None = None) -> dict: """Retrieve a named dataset, or create a new one when it doesn't exist. @@ -61,14 +61,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'datasets') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, unnamed: bool | None = None, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available datasets. https://docs.apify.com/api/v2#/reference/datasets/dataset-collection/get-list-of-datasets @@ -82,7 +82,7 @@ async def list( Returns: The list of available datasets matching the specified filters. """ - return await self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable(unnamed=unnamed, limit=limit, offset=offset, desc=desc) async def get_or_create( self, diff --git a/src/apify_client/clients/resource_clients/key_value_store_collection.py b/src/apify_client/clients/resource_clients/key_value_store_collection.py index 8af38903..bba333ec 100644 --- a/src/apify_client/clients/resource_clients/key_value_store_collection.py +++ b/src/apify_client/clients/resource_clients/key_value_store_collection.py @@ -6,7 +6,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class KeyValueStoreCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available key-value stores. https://docs.apify.com/api/v2#/reference/key-value-stores/store-collection/get-list-of-key-value-stores @@ -37,7 +37,7 @@ def list( Returns: The list of available key-value stores matching the specified filters. """ - return self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable(unnamed=unnamed, limit=limit, offset=offset, desc=desc) def get_or_create( self, @@ -66,14 +66,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'key-value-stores') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, unnamed: bool | None = None, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available key-value stores. https://docs.apify.com/api/v2#/reference/key-value-stores/store-collection/get-list-of-key-value-stores @@ -87,7 +87,7 @@ async def list( Returns: The list of available key-value stores matching the specified filters. """ - return await self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable(unnamed=unnamed, limit=limit, offset=offset, desc=desc) async def get_or_create( self, diff --git a/src/apify_client/clients/resource_clients/request_queue_collection.py b/src/apify_client/clients/resource_clients/request_queue_collection.py index f2ee80bb..95ec0672 100644 --- a/src/apify_client/clients/resource_clients/request_queue_collection.py +++ b/src/apify_client/clients/resource_clients/request_queue_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class RequestQueueCollectionClient(ResourceCollectionClient): @@ -22,7 +22,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available request queues. https://docs.apify.com/api/v2#/reference/request-queues/queue-collection/get-list-of-request-queues @@ -36,7 +36,7 @@ def list( Returns: The list of available request queues matching the specified filters. """ - return self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable(unnamed=unnamed, limit=limit, offset=offset, desc=desc) def get_or_create(self, *, name: str | None = None) -> dict: """Retrieve a named request queue, or create a new one when it doesn't exist. @@ -59,14 +59,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'request-queues') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, unnamed: bool | None = None, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available request queues. https://docs.apify.com/api/v2#/reference/request-queues/queue-collection/get-list-of-request-queues @@ -80,7 +80,7 @@ async def list( Returns: The list of available request queues matching the specified filters. """ - return await self._list(unnamed=unnamed, limit=limit, offset=offset, desc=desc) + return self._list_iterable(unnamed=unnamed, limit=limit, offset=offset, desc=desc) async def get_or_create(self, *, name: str | None = None) -> dict: """Retrieve a named request queue, or create a new one when it doesn't exist. diff --git a/src/apify_client/clients/resource_clients/run_collection.py b/src/apify_client/clients/resource_clients/run_collection.py index 77c5bc38..8fdbf2dc 100644 --- a/src/apify_client/clients/resource_clients/run_collection.py +++ b/src/apify_client/clients/resource_clients/run_collection.py @@ -10,7 +10,7 @@ from apify_shared.consts import ActorJobStatus - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class RunCollectionClient(ResourceCollectionClient): @@ -29,7 +29,7 @@ def list( status: ActorJobStatus | list[ActorJobStatus] | None = None, started_before: str | datetime | None = None, started_after: str | datetime | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List all Actor runs. List all Actor runs, either of a single Actor, or all user's Actors, depending on where this client @@ -54,7 +54,7 @@ def list( else: status_param = maybe_extract_enum_member_value(status) - return self._list( + return self._list_iterable( limit=limit, offset=offset, desc=desc, @@ -71,7 +71,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'actor-runs') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, @@ -80,7 +80,7 @@ async def list( status: ActorJobStatus | list[ActorJobStatus] | None = None, started_before: str | datetime | None = None, started_after: str | datetime | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List all Actor runs. List all Actor runs, either of a single Actor, or all user's Actors, depending on where this client @@ -105,7 +105,7 @@ async def list( else: status_param = maybe_extract_enum_member_value(status) - return await self._list( + return self._list_iterable( limit=limit, offset=offset, desc=desc, diff --git a/src/apify_client/clients/resource_clients/schedule_collection.py b/src/apify_client/clients/resource_clients/schedule_collection.py index e8386edf..d767a366 100644 --- a/src/apify_client/clients/resource_clients/schedule_collection.py +++ b/src/apify_client/clients/resource_clients/schedule_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.schedule import _get_schedule_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class ScheduleCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available schedules. https://docs.apify.com/api/v2#/reference/schedules/schedules-collection/get-list-of-schedules @@ -36,7 +36,7 @@ def list( Returns: The list of available schedules matching the specified filters. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) def create( self, @@ -93,13 +93,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'schedules') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available schedules. https://docs.apify.com/api/v2#/reference/schedules/schedules-collection/get-list-of-schedules @@ -112,7 +112,7 @@ async def list( Returns: The list of available schedules matching the specified filters. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) async def create( self, diff --git a/src/apify_client/clients/resource_clients/store_collection.py b/src/apify_client/clients/resource_clients/store_collection.py index f04200a0..96c33214 100644 --- a/src/apify_client/clients/resource_clients/store_collection.py +++ b/src/apify_client/clients/resource_clients/store_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class StoreCollectionClient(ResourceCollectionClient): @@ -25,7 +25,7 @@ def list( category: str | None = None, username: str | None = None, pricing_model: str | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List Actors in Apify store. https://docs.apify.com/api/v2/#/reference/store/store-actors-collection/get-list-of-actors-in-store @@ -43,7 +43,7 @@ def list( Returns: The list of available tasks matching the specified filters. """ - return self._list( + return self._list_iterable( limit=limit, offset=offset, search=search, @@ -61,7 +61,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'store') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, @@ -71,7 +71,7 @@ async def list( category: str | None = None, username: str | None = None, pricing_model: str | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List Actors in Apify store. https://docs.apify.com/api/v2/#/reference/store/store-actors-collection/get-list-of-actors-in-store @@ -89,7 +89,7 @@ async def list( Returns: The list of available tasks matching the specified filters. """ - return await self._list( + return self._list_iterable( limit=limit, offset=offset, search=search, diff --git a/src/apify_client/clients/resource_clients/task_collection.py b/src/apify_client/clients/resource_clients/task_collection.py index 0f8fe188..6d3520ab 100644 --- a/src/apify_client/clients/resource_clients/task_collection.py +++ b/src/apify_client/clients/resource_clients/task_collection.py @@ -7,7 +7,7 @@ from apify_client.clients.resource_clients.task import get_task_representation if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class TaskCollectionClient(ResourceCollectionClient): @@ -23,7 +23,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available tasks. https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks @@ -36,7 +36,7 @@ def list( Returns: The list of available tasks matching the specified filters. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) def create( self, @@ -114,13 +114,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'actor-tasks') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available tasks. https://docs.apify.com/api/v2#/reference/actor-tasks/task-collection/get-list-of-tasks @@ -133,7 +133,7 @@ async def list( Returns: The list of available tasks matching the specified filters. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) async def create( self, diff --git a/src/apify_client/clients/resource_clients/webhook_collection.py b/src/apify_client/clients/resource_clients/webhook_collection.py index 2add4361..1174f717 100644 --- a/src/apify_client/clients/resource_clients/webhook_collection.py +++ b/src/apify_client/clients/resource_clients/webhook_collection.py @@ -9,7 +9,7 @@ if TYPE_CHECKING: from apify_shared.consts import WebhookEventType - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class WebhookCollectionClient(ResourceCollectionClient): @@ -25,7 +25,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List the available webhooks. https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks @@ -38,7 +38,7 @@ def list( Returns: The list of available webhooks matching the specified filters. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) def create( self, @@ -103,13 +103,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'webhooks') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List the available webhooks. https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks @@ -122,7 +122,7 @@ async def list( Returns: The list of available webhooks matching the specified filters. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) async def create( self, diff --git a/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py b/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py index 60ac1df1..8ff4ce90 100644 --- a/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py +++ b/src/apify_client/clients/resource_clients/webhook_dispatch_collection.py @@ -5,7 +5,7 @@ from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync if TYPE_CHECKING: - from apify_client.clients.base.resource_collection_client import ListPage + from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync class WebhookDispatchCollectionClient(ResourceCollectionClient): @@ -21,7 +21,7 @@ def list( limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocol[dict]: """List all webhook dispatches of a user. https://docs.apify.com/api/v2#/reference/webhook-dispatches/webhook-dispatches-collection/get-list-of-webhook-dispatches @@ -34,7 +34,7 @@ def list( Returns: The retrieved webhook dispatches of a user. """ - return self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) class WebhookDispatchCollectionClientAsync(ResourceCollectionClientAsync): @@ -44,13 +44,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: resource_path = kwargs.pop('resource_path', 'webhook-dispatches') super().__init__(*args, resource_path=resource_path, **kwargs) - async def list( + def list( self, *, limit: int | None = None, offset: int | None = None, desc: bool | None = None, - ) -> ListPage[dict]: + ) -> ListPageProtocolAsync[dict]: """List all webhook dispatches of a user. https://docs.apify.com/api/v2#/reference/webhook-dispatches/webhook-dispatches-collection/get-list-of-webhook-dispatches @@ -63,4 +63,4 @@ async def list( Returns: The retrieved webhook dispatches of a user. """ - return await self._list(limit=limit, offset=offset, desc=desc) + return self._list_iterable(limit=limit, offset=offset, desc=desc) diff --git a/tests/unit/test_client_pagination.py b/tests/unit/test_client_pagination.py new file mode 100644 index 00000000..0c449c2c --- /dev/null +++ b/tests/unit/test_client_pagination.py @@ -0,0 +1,267 @@ +import dataclasses +from typing import Any, Literal, TypeAlias +from unittest import mock +from unittest.mock import Mock + +import pytest +from _pytest.mark import ParameterSet + +from apify_client import ApifyClient, ApifyClientAsync +from apify_client.clients import ( + ActorCollectionClient, + ActorCollectionClientAsync, + BaseClient, + BaseClientAsync, + BuildCollectionClient, + BuildCollectionClientAsync, + DatasetCollectionClient, + DatasetCollectionClientAsync, + KeyValueStoreCollectionClient, + KeyValueStoreCollectionClientAsync, + RequestQueueCollectionClient, + RequestQueueCollectionClientAsync, + RunCollectionClient, + RunCollectionClientAsync, + ScheduleCollectionClient, + ScheduleCollectionClientAsync, + StoreCollectionClient, + StoreCollectionClientAsync, + TaskCollectionClient, + TaskCollectionClientAsync, + WebhookCollectionClient, + WebhookCollectionClientAsync, + WebhookDispatchCollectionClient, + WebhookDispatchCollectionClientAsync, +) + +CollectionClientAsync: TypeAlias = ( + ActorCollectionClientAsync + | BuildCollectionClientAsync + | RunCollectionClientAsync + | ScheduleCollectionClientAsync + | TaskCollectionClientAsync + | WebhookCollectionClientAsync + | WebhookDispatchCollectionClientAsync + | DatasetCollectionClientAsync + | KeyValueStoreCollectionClientAsync + | RequestQueueCollectionClientAsync + | StoreCollectionClientAsync +) + +CollectionClient: TypeAlias = ( + ActorCollectionClient + | BuildCollectionClient + | RunCollectionClient + | ScheduleCollectionClient + | TaskCollectionClient + | WebhookCollectionClient + | WebhookDispatchCollectionClient + | DatasetCollectionClient + | KeyValueStoreCollectionClient + | RequestQueueCollectionClient + | StoreCollectionClient +) + + +def create_items(start: int, end: int) -> list[dict[str, int]]: + """Create list of test items of specified range.""" + step = -1 if end < start else 1 + return [{'id': i, 'key': i} for i in range(start, end, step)] + + +def mocked_api_pagination_logic(*_: Any, **kwargs: Any) -> dict: + """This function is a placeholder representing the mocked API pagination logic. + + It simulates paginated responses from an API only to a limited extend to test iteration logic in client. + Returned items are only placeholders that enable keeping track of their index on platform. + + There are 2500 normal items in the collection and additional 100 extra items. + Items are simple objects with incrementing attributes for easy verification. + """ + params = kwargs.get('params', {}) + normal_items = 2500 + extra_items = 100 # additional items, for example unnamed + max_items_per_page = 1000 + + total_items = (normal_items + extra_items) if params.get('unnamed') else normal_items + offset = params.get('offset') or 0 + limit = params.get('limit') or 0 + assert offset >= 0, 'Invalid offset send to API' + assert limit >= 0, 'Invalid limit send to API' + + # Ordered all items in the mocked platform. + items = create_items(total_items, 0) if params.get('desc', False) else create_items(0, total_items) + lower_index = min(offset, total_items) + upper_index = min(offset + (limit or total_items), total_items) + count = min(upper_index - lower_index, max_items_per_page) + + response = Mock() + response.json = lambda: { + 'data': { + 'total': total_items, + 'count': count, + 'offset': offset, + 'limit': limit or count, + 'desc': params.get('desc', False), + 'items': items[lower_index : min(upper_index, lower_index + max_items_per_page)], + } + } + + return response + + +@dataclasses.dataclass +class TestCase: + """Class representing a single test case for pagination tests.""" + + id: str + inputs: dict + expected_items: list[dict[str, int]] + supported_clients: set[str] + + def __hash__(self) -> int: + return hash(self.id) + + def supports(self, client: BaseClient | BaseClientAsync) -> bool: + """Check whether the given client implements functionality tested by this test.""" + return client.__class__.__name__.replace('Async', '') in self.supported_clients + + +# Prepare supported testcases for different clients +COLLECTION_CLIENTS = { + 'ActorCollectionClient', + 'BuildCollectionClient', + 'RunCollectionClient', + 'ScheduleCollectionClient', + 'TaskCollectionClient', + 'WebhookCollectionClient', + 'WebhookDispatchCollectionClient', + 'DatasetCollectionClient', + 'KeyValueStoreCollectionClient', + 'RequestQueueCollectionClient', + 'StoreCollectionClient', +} + +NO_OPTIONS_CLIENTS = { + 'ActorEnvVarCollectionClient', + 'ActorVersionCollectionClient', +} + +STORAGE_CLIENTS = { + 'DatasetClient', + 'KeyValueStoreClient', + 'RequestQueueClient', +} + +ALL_CLIENTS = COLLECTION_CLIENTS | NO_OPTIONS_CLIENTS | STORAGE_CLIENTS + +TEST_CASES = ( + TestCase('No options', {}, create_items(0, 2500), ALL_CLIENTS), + TestCase('Limit', {'limit': 1100}, create_items(0, 1100), ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase('Out of range limit', {'limit': 3000}, create_items(0, 2500), ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase('Offset', {'offset': 1000}, create_items(1000, 2500), ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase( + 'Offset and limit', {'offset': 1000, 'limit': 1100}, create_items(1000, 2100), ALL_CLIENTS - NO_OPTIONS_CLIENTS + ), + TestCase('Out of range offset', {'offset': 3000}, [], ALL_CLIENTS - NO_OPTIONS_CLIENTS), + TestCase( + 'Offset, limit, descending', + {'offset': 1000, 'limit': 1100, 'desc': True}, + create_items(1500, 400), + ALL_CLIENTS - NO_OPTIONS_CLIENTS - {'StoreCollectionClient'}, + ), + TestCase( + 'Offset, limit, descending, unnamed', + {'offset': 50, 'limit': 1100, 'desc': True, 'unnamed': True}, + create_items(2550, 1450), + {'DatasetCollectionClient', 'KeyValueStoreCollectionClient', 'RequestQueueCollectionClient'}, + ), + TestCase( + 'Offset, limit, descending, chunkSize', + {'offset': 50, 'limit': 1100, 'desc': True, 'chunk_size': 100}, + create_items(1500, 400), + {'DatasetClient'}, + ), + TestCase('Exclusive start key', {'exclusive_start_key': 1000}, create_items(1001, 2500), {'KeyValueStoreClient'}), + TestCase('Exclusive start id', {'exclusive_start_id': 1000}, create_items(1001, 2500), {'RequestQueueClient'}), +) + + +def generate_test_params( + client_set: Literal['collection', 'kvs', 'rq', 'dataset'], *, async_clients: bool = False +) -> list[ParameterSet]: + """Generate list of ParameterSets for parametrized tests. + + Different clients support different options and thus different scenarios. + """ + + client = ApifyClientAsync(token='') if async_clients else ApifyClient(token='') + + # This is tuple instead of set because pytest-xdist + # https://pytest-xdist.readthedocs.io/en/stable/known-limitations.html#order-and-amount-of-test-must-be-consistent + clients: tuple[BaseClient | BaseClientAsync, ...] + + match client_set: + case 'collection': + clients = ( + client.actors(), + client.schedules(), + client.tasks(), + client.webhooks(), + client.webhook_dispatches(), + client.store(), + client.datasets(), + client.key_value_stores(), + client.request_queues(), + client.actor('some-id').builds(), + client.actor('some-id').runs(), + client.actor('some-id').versions(), + client.actor('some-id').version('some-version').env_vars(), + ) + case 'kvs': + clients = (client.key_value_store('some-id'),) + case 'rq': + clients = (client.request_queue('some-id'),) + case 'dataset': + clients = (client.dataset('some-id'),) + case _: + raise ValueError(f'Unknown client set: {client_set}') + + return [ + pytest.param( + test_case.inputs, test_case.expected_items, client, id=f'{client.__class__.__name__}:{test_case.id}' + ) + for test_case in TEST_CASES + for client in clients + if test_case.supports(client) + ] + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client'), generate_test_params(client_set='collection', async_clients=True) +) +async def test_client_list_iterable_async( + client: CollectionClientAsync, inputs: dict, expected_items: list[dict[str, int]] +) -> None: + with mock.patch.object(client.http_client, 'call', side_effect=mocked_api_pagination_logic): + returned_items = [item async for item in client.list(**inputs)] + + if inputs == {}: + list_response = await client.list(**inputs) + assert len(returned_items) == list_response.total + + assert returned_items == expected_items + + +@pytest.mark.parametrize( + ('inputs', 'expected_items', 'client'), generate_test_params(client_set='collection', async_clients=False) +) +def test_client_list_iterable(client: CollectionClient, inputs: dict, expected_items: list[dict[str, int]]) -> None: + with mock.patch.object(client.http_client, 'call', side_effect=mocked_api_pagination_logic): + returned_items = [item for item in client.list(**inputs)] # noqa: C416 list needed for assertion + + if inputs == {}: + list_response = client.list(**inputs) + assert len(returned_items) == list_response.total + + assert returned_items == expected_items