Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 135 additions & 1 deletion src/apify_client/clients/base/resource_collection_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import Any, Generic, TypeVar
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Generator, Iterable, Iterator
from typing import Any, Generic, Protocol, TypeVar

from apify_client._utils import parse_date_fields, pluck_data
from apify_client.clients.base.base_client import BaseClient, BaseClientAsync
Expand Down Expand Up @@ -51,6 +52,36 @@ def _list(self, **kwargs: Any) -> ListPage:

return ListPage(parse_date_fields(pluck_data(response.json())))

def _list_iterable(self, **kwargs: Any) -> IterableListPage[T]:
"""Return object can be awaited or iterated over."""
chunk_size = kwargs.pop('chunk_size', None)

list_page = self._list(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})

def iterator() -> Iterator[T]:
current_page = list_page
for item in current_page.items:
yield item

offset = kwargs.get('offset') or 0
limit = min(kwargs.get('limit') or current_page.total, current_page.total)

current_offset = offset + len(current_page.items)
remaining_items = min(current_page.total - offset, limit) - len(current_page.items)
while current_page.items and remaining_items > 0:
new_kwargs = {
**kwargs,
'offset': current_offset,
'limit': _min_for_limit_param(remaining_items, chunk_size),
}
current_page = self._list(**new_kwargs)
for item in current_page.items:
yield item
current_offset += len(current_page.items)
remaining_items -= len(current_page.items)

return IterableListPage[T](list_page, iterator())

def _create(self, resource: dict) -> dict:
response = self.http_client.call(
url=self._url(),
Expand Down Expand Up @@ -84,6 +115,36 @@ async def _list(self, **kwargs: Any) -> ListPage:

return ListPage(parse_date_fields(pluck_data(response.json())))

def _list_iterable(self, **kwargs: Any) -> ListPageProtocolAsync[T]:
"""Return object can be awaited or iterated over."""
chunk_size = kwargs.pop('chunk_size', None)

list_page_awaitable = self._list(**{**kwargs, 'limit': _min_for_limit_param(kwargs.get('limit'), chunk_size)})

async def async_iterator() -> AsyncIterator[T]:
current_page = await list_page_awaitable
for item in current_page.items:
yield item

offset = kwargs.get('offset') or 0
limit = min(kwargs.get('limit') or current_page.total, current_page.total)

current_offset = offset + len(current_page.items)
remaining_items = min(current_page.total - offset, limit) - len(current_page.items)
while current_page.items and remaining_items > 0:
new_kwargs = {
**kwargs,
'offset': current_offset,
'limit': _min_for_limit_param(remaining_items, chunk_size),
}
current_page = await self._list(**new_kwargs)
for item in current_page.items:
yield item
current_offset += len(current_page.items)
remaining_items -= len(current_page.items)

return IterableListPageAsync[T](list_page_awaitable, async_iterator())

async def _create(self, resource: dict) -> dict:
response = await self.http_client.call(
url=self._url(),
Expand All @@ -107,3 +168,76 @@ async def _get_or_create(
)

return parse_date_fields(pluck_data(response.json()))


class ListPageProtocol(Protocol[T], Iterable[T]):
"""Protocol for an object that can be both awaited and asynchronously iterated over."""

items: list[T]
"""List of returned objects on this page"""

count: int
"""Count of the returned objects on this page"""

offset: int
"""The limit on the number of returned objects offset specified in the API call"""

limit: int
"""The offset of the first object specified in the API call"""

total: int
"""Total number of objects matching the API call criteria"""

desc: bool
"""Whether the listing is descending or not"""


class IterableListPage(ListPage[T], Generic[T]):
"""Can be called to get ListPage with items or iterated over to get individual items."""

def __init__(self, list_page: ListPage[T], iterator: Iterator[T]) -> None:
self.items = list_page.items
self.offset = list_page.offset
self.limit = list_page.limit
self.count = list_page.count
self.total = list_page.total
self.desc = list_page.desc
self._iterator = iterator

def __iter__(self) -> Iterator[T]:
"""Return an iterator over the items from API, possibly doing multiple API calls."""
return self._iterator


class ListPageProtocolAsync(Protocol[T], AsyncIterable[T], Awaitable[ListPage[T]]):
"""Protocol for an object that can be both awaited and asynchronously iterated over."""


class IterableListPageAsync(Generic[T]):
"""Can be awaited to get ListPage with items or asynchronously iterated over to get individual items."""

def __init__(self, awaitable: Awaitable[ListPage[T]], async_iterator: AsyncIterator[T]) -> None:
self._awaitable = awaitable
self._async_iterator = async_iterator

def __aiter__(self) -> AsyncIterator[T]:
"""Return an asynchronous iterator over the items from API, possibly doing multiple API calls."""
return self._async_iterator

def __await__(self) -> Generator[Any, Any, ListPage[T]]:
"""Return an awaitable that resolves to the ListPage doing exactly one API call."""
return self._awaitable.__await__()


def _min_for_limit_param(a: int | None, b: int | None) -> int | None:
"""Return minimum of two limit parameters, treating None or 0 as infinity. Return None for infinity."""
# API treats 0 as None for limit parameter, in this context API understands 0 as infinity.
if a == 0:
a = None
if b == 0:
b = None
if a is None:
return b
if b is None:
return a
return min(a, b)
12 changes: 6 additions & 6 deletions src/apify_client/clients/resource_clients/actor_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from apify_client.clients.resource_clients.actor import get_actor_representation

if TYPE_CHECKING:
from apify_client.clients.base.resource_collection_client import ListPage
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync


class ActorCollectionClient(ResourceCollectionClient):
Expand All @@ -25,7 +25,7 @@ def list(
offset: int | None = None,
desc: bool | None = None,
sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt',
) -> ListPage[dict]:
) -> ListPageProtocol[dict]:
"""List the Actors the user has created or used.

https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors
Expand All @@ -40,7 +40,7 @@ def list(
Returns:
The list of available Actors matching the specified filters.
"""
return self._list(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by)
return self._list_iterable(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by)

def create(
self,
Expand Down Expand Up @@ -142,15 +142,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'acts')
super().__init__(*args, resource_path=resource_path, **kwargs)

async def list(
def list(
self,
*,
my: bool | None = None,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
sort_by: Literal['createdAt', 'stats.lastRunStartedAt'] | None = 'createdAt',
) -> ListPage[dict]:
) -> ListPageProtocolAsync[dict]:
"""List the Actors the user has created or used.

https://docs.apify.com/api/v2#/reference/actors/actor-collection/get-list-of-actors
Expand All @@ -165,7 +165,7 @@ async def list(
Returns:
The list of available Actors matching the specified filters.
"""
return await self._list(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by)
return self._list_iterable(my=my, limit=limit, offset=offset, desc=desc, sortBy=sort_by)

async def create(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from apify_client.clients.resource_clients.actor_env_var import get_actor_env_var_representation

if TYPE_CHECKING:
from apify_client.clients.base.resource_collection_client import ListPage
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync


class ActorEnvVarCollectionClient(ResourceCollectionClient):
Expand All @@ -17,15 +17,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'env-vars')
super().__init__(*args, resource_path=resource_path, **kwargs)

def list(self) -> ListPage[dict]:
def list(self) -> ListPageProtocol[dict]:
"""List the available actor environment variables.

https://docs.apify.com/api/v2#/reference/actors/environment-variable-collection/get-list-of-environment-variables

Returns:
The list of available actor environment variables.
"""
return self._list()
return self._list_iterable()

def create(
self,
Expand Down Expand Up @@ -62,15 +62,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'env-vars')
super().__init__(*args, resource_path=resource_path, **kwargs)

async def list(self) -> ListPage[dict]:
def list(self) -> ListPageProtocolAsync[dict]:
"""List the available actor environment variables.

https://docs.apify.com/api/v2#/reference/actors/environment-variable-collection/get-list-of-environment-variables

Returns:
The list of available actor environment variables.
"""
return await self._list()
return self._list_iterable()

async def create(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
if TYPE_CHECKING:
from apify_shared.consts import ActorSourceType

from apify_client.clients.base.resource_collection_client import ListPage
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync


class ActorVersionCollectionClient(ResourceCollectionClient):
Expand All @@ -19,15 +19,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'versions')
super().__init__(*args, resource_path=resource_path, **kwargs)

def list(self) -> ListPage[dict]:
def list(self) -> ListPageProtocol[dict]:
"""List the available Actor versions.

https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions

Returns:
The list of available Actor versions.
"""
return self._list()
return self._list_iterable()

def create(
self,
Expand Down Expand Up @@ -88,15 +88,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'versions')
super().__init__(*args, resource_path=resource_path, **kwargs)

async def list(self) -> ListPage[dict]:
def list(self) -> ListPageProtocolAsync[dict]:
"""List the available Actor versions.

https://docs.apify.com/api/v2#/reference/actors/version-collection/get-list-of-versions

Returns:
The list of available Actor versions.
"""
return await self._list()
return self._list_iterable()

async def create(
self,
Expand Down
12 changes: 6 additions & 6 deletions src/apify_client/clients/resource_clients/build_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync

if TYPE_CHECKING:
from apify_client.clients.base.resource_collection_client import ListPage
from apify_client.clients.base.resource_collection_client import ListPageProtocol, ListPageProtocolAsync


class BuildCollectionClient(ResourceCollectionClient):
Expand All @@ -21,7 +21,7 @@ def list(
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
) -> ListPage[dict]:
) -> ListPageProtocol[dict]:
"""List all Actor builds.

List all Actor builds, either of a single Actor, or all user's Actors, depending on where this client
Expand All @@ -38,7 +38,7 @@ def list(
Returns:
The retrieved Actor builds.
"""
return self._list(limit=limit, offset=offset, desc=desc)
return self._list_iterable(limit=limit, offset=offset, desc=desc)


class BuildCollectionClientAsync(ResourceCollectionClientAsync):
Expand All @@ -48,13 +48,13 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'actor-builds')
super().__init__(*args, resource_path=resource_path, **kwargs)

async def list(
def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
) -> ListPage[dict]:
) -> ListPageProtocolAsync[dict]:
"""List all Actor builds.

List all Actor builds, either of a single Actor, or all user's Actors, depending on where this client
Expand All @@ -71,4 +71,4 @@ async def list(
Returns:
The retrieved Actor builds.
"""
return await self._list(limit=limit, offset=offset, desc=desc)
return self._list_iterable(limit=limit, offset=offset, desc=desc)
Loading
Loading