diff --git a/python/lib/sift_client/_internal/low_level_wrappers/rules.py b/python/lib/sift_client/_internal/low_level_wrappers/rules.py index 5e3794066..3658a0698 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/rules.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/rules.py @@ -53,7 +53,7 @@ from sift.rules.v1.rules_pb2_grpc import RuleServiceStub from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase -from sift_client._internal.low_level_wrappers.reports import ReportsLowLevelClient +from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient from sift_client._internal.util.timestamp import to_pb_timestamp from sift_client._internal.util.util import count_non_none from sift_client.sift_types.rule import ( @@ -69,7 +69,7 @@ from datetime import datetime from sift_client.sift_types.channel import ChannelReference - from sift_client.sift_types.report import Report + from sift_client.sift_types.job import Job # Configure logging logger = logging.getLogger(__name__) @@ -587,8 +587,8 @@ async def evaluate_rules( report_name: str | None = None, tags: list[str | Tag] | None = None, organization_id: str | None = None, - ) -> tuple[int, Report | None, str | None]: - """Evaluate a rule. + ) -> tuple[int, str | None, Job | None]: + """Evaluate rules. Args: run_id: The run ID to evaluate. @@ -604,7 +604,7 @@ async def evaluate_rules( organization_id: The organization ID to evaluate. Returns: - The result of the rule execution. + The annotation_count, report_id, and job for the pending report. """ if count_non_none(run_id, asset_ids) > 1: raise ValueError( @@ -664,13 +664,13 @@ async def evaluate_rules( request ) response = cast("EvaluateRulesResponse", response) - created_annotation_count = response.created_annotation_count - report_id = response.report_id job_id = response.job_id - if report_id: - report = await ReportsLowLevelClient(self._grpc_client).get_report(report_id=report_id) - return created_annotation_count, report, job_id - return created_annotation_count, None, job_id + + if job_id: + job = await JobsLowLevelClient(self._grpc_client).get_job(job_id=job_id) + else: + job = None + return response.created_annotation_count, response.report_id, job async def get_rule_version(self, rule_version_id: str) -> Rule: """Get a rule at a specific version by rule_version_id. diff --git a/python/lib/sift_client/_tests/resources/test_jobs.py b/python/lib/sift_client/_tests/resources/test_jobs.py index 4ac74bec0..ebc9ea09f 100644 --- a/python/lib/sift_client/_tests/resources/test_jobs.py +++ b/python/lib/sift_client/_tests/resources/test_jobs.py @@ -8,6 +8,7 @@ """ from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock, patch import pytest from grpc.aio import AioRpcError @@ -291,6 +292,106 @@ async def test_retry_finished_job_no_effect(self, jobs_api_async): with pytest.raises(AioRpcError, match="job cannot be retried"): await jobs_api_async.retry(job) + class TestWaitUntilComplete: + """Tests for the async wait_until_complete method.""" + + @pytest.mark.asyncio + async def test_returns_immediately_when_job_already_complete(self, jobs_api_async): + """When get returns a completed job on first call, wait returns immediately.""" + job_id = "test-job-id" + mock_job = MagicMock() + mock_job.job_status = JobStatus.FINISHED + + with patch( + "sift_client.resources.jobs.JobsAPIAsync.get", + new_callable=AsyncMock, + return_value=mock_job, + ) as mock_get: + result = await jobs_api_async.wait_until_complete(job=job_id) + + assert result is mock_job + assert result.job_status == JobStatus.FINISHED + mock_get.assert_called_once_with(job_id) + + @pytest.mark.asyncio + async def test_returns_immediately_when_job_already_failed(self, jobs_api_async): + """When get returns a failed job on first call, wait returns immediately.""" + job_id = "test-job-id" + mock_job = MagicMock() + mock_job.job_status = JobStatus.FAILED + + with patch( + "sift_client.resources.jobs.JobsAPIAsync.get", + new_callable=AsyncMock, + return_value=mock_job, + ) as mock_get: + result = await jobs_api_async.wait_until_complete(job=job_id) + + assert result is mock_job + assert result.job_status == JobStatus.FAILED + mock_get.assert_called_once_with(job_id) + + @pytest.mark.asyncio + async def test_returns_immediately_when_job_already_cancelled(self, jobs_api_async): + """When get returns a cancelled job on first call, wait returns immediately.""" + job_id = "test-job-id" + mock_job = MagicMock() + mock_job.job_status = JobStatus.CANCELLED + + with patch( + "sift_client.resources.jobs.JobsAPIAsync.get", + new_callable=AsyncMock, + return_value=mock_job, + ) as mock_get: + result = await jobs_api_async.wait_until_complete(job=job_id) + + assert result is mock_job + assert result.job_status == JobStatus.CANCELLED + mock_get.assert_called_once_with(job_id) + + @pytest.mark.asyncio + async def test_polls_until_complete(self, jobs_api_async): + """When get returns running then finished, wait returns after second poll.""" + job_id = "test-job-id" + running_job = MagicMock() + running_job.job_status = JobStatus.RUNNING + finished_job = MagicMock() + finished_job.job_status = JobStatus.FINISHED + + with patch( + "sift_client.resources.jobs.JobsAPIAsync.get", + new_callable=AsyncMock, + side_effect=[running_job, finished_job], + ) as mock_get: + result = await jobs_api_async.wait_until_complete( + job=job_id, + polling_interval_secs=0.01, + timeout_secs=10.0, + ) + + assert result is finished_job + assert result.job_status == JobStatus.FINISHED + assert mock_get.call_count == 2 + + @pytest.mark.asyncio + async def test_raises_timeout_error_when_not_complete_in_time(self, jobs_api_async): + """When job never reaches a completed state, TimeoutError is raised.""" + job_id = "test-job-id" + running_job = MagicMock() + running_job.job_status = JobStatus.RUNNING + + with patch( + "sift_client.resources.jobs.JobsAPIAsync.get", + new_callable=AsyncMock, + return_value=running_job, + ): + with pytest.raises(TimeoutError): + await jobs_api_async.wait_until_complete( + job=job_id, + polling_interval_secs=0.05, + timeout_secs=0.1, + ) + class TestJobProperties: """Tests for job property methods.""" diff --git a/python/lib/sift_client/_tests/resources/test_reports.py b/python/lib/sift_client/_tests/resources/test_reports.py index 3eb3e6f05..2a72b5a58 100644 --- a/python/lib/sift_client/_tests/resources/test_reports.py +++ b/python/lib/sift_client/_tests/resources/test_reports.py @@ -1,3 +1,5 @@ +from unittest.mock import AsyncMock, MagicMock, patch + import pytest from sift_client.resources import ReportsAPI, ReportsAPIAsync @@ -7,6 +9,7 @@ RuleAction, RuleAnnotationType, ) +from sift_client.sift_types.job import RuleEvaluationDetails @pytest.fixture(scope="session") @@ -52,19 +55,185 @@ def test_client_binding(sift_client): assert isinstance(sift_client.async_.reports, ReportsAPIAsync) +@pytest.fixture +def reports_api_async_mock_client(mock_client): + """ReportsAPIAsync with a mock client for unit testing wait_until_complete.""" + mock_client.async_.jobs = MagicMock() + mock_client.async_.jobs.get = AsyncMock() + mock_client.async_.jobs.wait_until_complete = AsyncMock() + return ReportsAPIAsync(mock_client) + + +class TestReportsWaitUntilComplete: + """Unit tests for ReportsAPIAsync.wait_until_complete validation and input handling.""" + + @pytest.mark.asyncio + async def test_raises_when_neither_report_nor_job_provided(self, reports_api_async_mock_client): + with pytest.raises(ValueError, match="either report or job must be provided"): + await reports_api_async_mock_client.wait_until_complete() + + @pytest.mark.asyncio + async def test_raises_when_both_report_and_job_provided(self, reports_api_async_mock_client): + mock_report = MagicMock() + mock_report.job_id = "job-1" + mock_report.id_ = "report-1" + mock_job = MagicMock() + mock_job.id_ = "job-1" + mock_job.job_details = RuleEvaluationDetails(report_id="report-1") + + with pytest.raises(ValueError, match="exactly one of report or job must be provided"): + await reports_api_async_mock_client.wait_until_complete( + report=mock_report, job=mock_job + ) + + @pytest.mark.asyncio + async def test_valid_report_object_waits_and_returns_report( + self, reports_api_async_mock_client + ): + mock_report = MagicMock() + mock_report.job_id = "job-1" + mock_report.id_ = "report-1" + completed_report = MagicMock() + completed_report.id_ = "report-1" + + with patch.object( + reports_api_async_mock_client, + "get", + new_callable=AsyncMock, + return_value=completed_report, + ) as mock_get: + result = await reports_api_async_mock_client.wait_until_complete(report=mock_report) + + assert result is completed_report + reports_api_async_mock_client.client.async_.jobs.wait_until_complete.assert_awaited_once_with( + job="job-1", polling_interval_secs=5, timeout_secs=None + ) + mock_get.assert_awaited_once_with(report_id="report-1") + + @pytest.mark.asyncio + async def test_valid_report_id_str_fetches_report_then_waits_and_returns( + self, reports_api_async_mock_client + ): + report_id = "report-1" + mock_report_from_get = MagicMock() + mock_report_from_get.job_id = "job-1" + mock_report_from_get.id_ = report_id + completed_report = MagicMock() + completed_report.id_ = report_id + + with patch.object( + reports_api_async_mock_client, + "get", + new_callable=AsyncMock, + side_effect=[mock_report_from_get, completed_report], + ) as mock_get: + result = await reports_api_async_mock_client.wait_until_complete(report=report_id) + + assert result is completed_report + assert mock_get.await_count == 2 + reports_api_async_mock_client.client.async_.jobs.wait_until_complete.assert_awaited_once_with( + job="job-1", polling_interval_secs=5, timeout_secs=None + ) + + @pytest.mark.asyncio + async def test_valid_job_object_rule_evaluation_waits_and_returns_report( + self, reports_api_async_mock_client + ): + mock_job = MagicMock() + mock_job.id_ = "job-1" + mock_job.job_details = RuleEvaluationDetails(report_id="report-1") + completed_report = MagicMock() + completed_report.id_ = "report-1" + + with patch.object( + reports_api_async_mock_client, + "get", + new_callable=AsyncMock, + return_value=completed_report, + ) as mock_get: + result = await reports_api_async_mock_client.wait_until_complete(job=mock_job) + + assert result is completed_report + reports_api_async_mock_client.client.async_.jobs.wait_until_complete.assert_awaited_once_with( + job="job-1", polling_interval_secs=5, timeout_secs=None + ) + mock_get.assert_awaited_once_with(report_id="report-1") + + @pytest.mark.asyncio + async def test_valid_job_id_str_fetches_job_then_waits_and_returns_report( + self, reports_api_async_mock_client + ): + job_id = "job-1" + report_id = "report-1" + mock_job_from_get = MagicMock() + mock_job_from_get.id_ = job_id + mock_job_from_get.job_details = RuleEvaluationDetails(report_id=report_id) + reports_api_async_mock_client.client.async_.jobs.get = AsyncMock( + return_value=mock_job_from_get + ) + completed_report = MagicMock() + completed_report.id_ = report_id + + with patch.object( + reports_api_async_mock_client, + "get", + new_callable=AsyncMock, + return_value=completed_report, + ) as mock_get: + result = await reports_api_async_mock_client.wait_until_complete(job=job_id) + + assert result is completed_report + reports_api_async_mock_client.client.async_.jobs.get.assert_awaited_once_with(job_id=job_id) + reports_api_async_mock_client.client.async_.jobs.wait_until_complete.assert_awaited_once_with( + job=job_id, polling_interval_secs=5, timeout_secs=None + ) + mock_get.assert_awaited_once_with(report_id=report_id) + + @pytest.mark.asyncio + async def test_raises_when_job_object_not_rule_evaluation(self, reports_api_async_mock_client): + mock_job = MagicMock() + mock_job.id_ = "job-1" + mock_job.job_details = None # not RuleEvaluationDetails + + with pytest.raises(ValueError, match="job is not a rule evaluation job"): + await reports_api_async_mock_client.wait_until_complete(job=mock_job) + + reports_api_async_mock_client.client.async_.jobs.wait_until_complete.assert_not_awaited() + + @pytest.mark.asyncio + async def test_raises_when_job_id_str_fetches_non_rule_evaluation_job( + self, reports_api_async_mock_client + ): + job_id = "job-1" + mock_job_from_get = MagicMock() + mock_job_from_get.id_ = job_id + mock_job_from_get.job_details = None # e.g. data import job + reports_api_async_mock_client.client.async_.jobs.get = AsyncMock( + return_value=mock_job_from_get + ) + + with pytest.raises(ValueError, match="job is not a rule evaluation job"): + await reports_api_async_mock_client.wait_until_complete(job=job_id) + + reports_api_async_mock_client.client.async_.jobs.get.assert_awaited_once_with(job_id=job_id) + reports_api_async_mock_client.client.async_.jobs.wait_until_complete.assert_not_awaited() + + @pytest.mark.integration class TestReports: def test_create_from_rule_versions(self, nostromo_run, test_rule, sift_client): """Create a report from specific rule version IDs.""" rule_versions = sift_client.rules.list_rule_versions(test_rule) assert rule_versions, "test_rule should have at least one version" - report = sift_client.reports.create_from_rule_versions( + job = sift_client.reports.create_from_rule_versions( name="report_from_rule_versions", run=nostromo_run, organization_id=nostromo_run.organization_id, rule_versions=[rule_versions[0].rule_version_id], ) - assert report is not None + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + report = sift_client.reports.get(report_id=job.job_details.report_id) assert report.run_id == nostromo_run.id_ assert report.name == "report_from_rule_versions" @@ -74,23 +243,61 @@ def test_create_from_rule_versions_with_rule_version_objects( """Create a report passing RuleVersion instances.""" rule_versions = sift_client.rules.list_rule_versions(test_rule) assert rule_versions - report = sift_client.reports.create_from_rule_versions( + job = sift_client.reports.create_from_rule_versions( name="report_from_rule_versions_objs", run=nostromo_run, organization_id=nostromo_run.organization_id, rule_versions=rule_versions[:1], ) - assert report is not None + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + report = sift_client.reports.get(report_id=job.job_details.report_id) assert report.run_id == nostromo_run.id_ def test_create_from_rules(self, nostromo_run, test_rule, sift_client): - report_from_rules = sift_client.reports.create_from_rules( + job = sift_client.reports.create_from_rules( name="report_from_rules", run=nostromo_run, rules=[test_rule], ) - assert report_from_rules is not None - assert report_from_rules.run_id == nostromo_run.id_ + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + report = sift_client.reports.get(report_id=job.job_details.report_id) + assert report.run_id == nostromo_run.id_ + + @pytest.mark.asyncio + async def test_wait_until_complete(self, nostromo_run, test_rule, sift_client): + """Create a report and wait for its job to complete via jobs.wait_until_complete.""" + job = sift_client.reports.create_from_rules( + name="report_wait_until_complete", + run=nostromo_run, + rules=[test_rule], + ) + assert job is not None + assert job.id_ + assert isinstance(job.job_details, RuleEvaluationDetails) + + await sift_client.async_.jobs.wait_until_complete( + job=job, + polling_interval_secs=2, + timeout_secs=120, + ) + completed_report = await sift_client.async_.reports.get(report_id=job.job_details.report_id) + + assert completed_report is not None + assert completed_report.id_ == job.job_details.report_id + assert completed_report.job_id == job.id_ + + completed_rule_statuses = ( + ReportRuleStatus.FINISHED, + ReportRuleStatus.FAILED, + ReportRuleStatus.CANCELED, + ReportRuleStatus.ERROR, + ) + assert len(completed_report.summaries) == 1 + assert any(s.status in completed_rule_statuses for s in completed_report.summaries), ( + "expected rule summary to be in a completed state" + ) def test_create_from_applicable_rules( self, test_rule, nostromo_asset, nostromo_run, sift_client @@ -98,13 +305,15 @@ def test_create_from_applicable_rules( if not test_rule.asset_ids: # Test rule may exist but be in a state where it no longer applies to the asset associated w/ the run so re-attach it if necessary. test_rule = test_rule.update(update={"asset_ids": [nostromo_asset._id_or_error]}) - report_from_applicable_rules = sift_client.reports.create_from_applicable_rules( + job = sift_client.reports.create_from_applicable_rules( name="report_from_applicable_rules_run", run=nostromo_run, organization_id=nostromo_run.organization_id, ) - assert report_from_applicable_rules is not None - assert report_from_applicable_rules.run_id == nostromo_run.id_ + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + report = sift_client.reports.get(report_id=job.job_details.report_id) + assert report.run_id == nostromo_run.id_ def test_list(self, nostromo_asset, nostromo_run, tags, sift_client): reports = sift_client.reports.list_( @@ -114,27 +323,33 @@ def test_list(self, nostromo_asset, nostromo_run, tags, sift_client): assert len(reports) > 0 def test_rerun(self, nostromo_asset, nostromo_run, test_rule, sift_client): - report_from_rules = sift_client.reports.create_from_rules( + job = sift_client.reports.create_from_rules( name="report_from_rules", run=nostromo_run, rules=[test_rule], ) - assert report_from_rules is not None - job_id, rerun_report_id = sift_client.reports.rerun(report=report_from_rules) - rerun_report = sift_client.reports.get(report_id=rerun_report_id) + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + rerun_job = sift_client.reports.rerun(report=job.job_details.report_id) + assert rerun_job is not None + assert rerun_job.id_ + assert isinstance(rerun_job.job_details, RuleEvaluationDetails) + rerun_report = sift_client.reports.get(report_id=rerun_job.job_details.report_id) assert rerun_report is not None assert rerun_report.run_id == nostromo_run.id_ - assert rerun_report.rerun_from_report_id == report_from_rules.id_ + assert rerun_report.rerun_from_report_id == job.job_details.report_id def test_update(self, nostromo_asset, nostromo_run, test_rule, sift_client): - report_from_rules = sift_client.reports.create_from_rules( + job = sift_client.reports.create_from_rules( name="report_from_rules", run=nostromo_run, rules=[test_rule], ) - assert report_from_rules is not None + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + report = sift_client.reports.get(report_id=job.job_details.report_id) updated_report = sift_client.reports.update( - report=report_from_rules, + report=report, update={ "metadata": { "test_type": "ci", @@ -149,41 +364,50 @@ def test_find_multiple(self, sift_client): sift_client.reports.find(name="report_from_rules") def test_cancel(self, nostromo_asset, nostromo_run, test_rule, sift_client): - report_from_rules = sift_client.reports.create_from_rules( + job = sift_client.reports.create_from_rules( name="report_from_rules", run=nostromo_run, rules=[test_rule], ) - assert report_from_rules is not None - job_id, second_rerun_report_id = sift_client.reports.rerun(report=report_from_rules) - assert second_rerun_report_id is not None - sift_client.reports.cancel(report=second_rerun_report_id) - canceled_report = sift_client.reports.find(report_ids=[second_rerun_report_id]) + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + second_rerun_job = sift_client.reports.rerun(report=job.job_details.report_id) + assert second_rerun_job is not None + assert isinstance(second_rerun_job.job_details, RuleEvaluationDetails) + sift_client.reports.cancel(report=second_rerun_job.job_details.report_id) + canceled_report = sift_client.reports.find( + report_ids=[second_rerun_job.job_details.report_id] + ) assert canceled_report is not None for summary in canceled_report.summaries: # Sometimes the report finishes before it can be canceled. assert summary.status in [ReportRuleStatus.CANCELED, ReportRuleStatus.FINISHED] def test_archive(self, nostromo_run, test_rule, sift_client): - report_from_rules = sift_client.reports.create_from_rules( + job = sift_client.reports.create_from_rules( name="report_from_rules", run=nostromo_run, rules=[test_rule], ) - assert report_from_rules is not None - archived_report = sift_client.reports.archive(report=report_from_rules) + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + job.wait_until_complete(polling_interval_secs=2, timeout_secs=120) + report = sift_client.reports.get(report_id=job.job_details.report_id) + archived_report = sift_client.reports.archive(report=report) assert archived_report is not None assert archived_report.is_archived == True def test_unarchive(self, nostromo_run, test_rule, sift_client): - # create a report, archive it, then unarchive it - report_from_rules = sift_client.reports.create_from_rules( + # Create, wait for completion, then archive to ensure we have an archived report + job = sift_client.reports.create_from_rules( name="report_from_rules_unarchive", run=nostromo_run, rules=[test_rule], ) - assert report_from_rules is not None - archived_report = sift_client.reports.archive(report=report_from_rules) + assert job is not None + assert isinstance(job.job_details, RuleEvaluationDetails) + report = sift_client.reports.get(report_id=job.job_details.report_id) + archived_report = sift_client.reports.archive(report=report) assert archived_report.is_archived is True unarchived_report = sift_client.reports.unarchive(report=archived_report) assert unarchived_report is not None diff --git a/python/lib/sift_client/resources/jobs.py b/python/lib/sift_client/resources/jobs.py index 2e6c86408..c3f775389 100644 --- a/python/lib/sift_client/resources/jobs.py +++ b/python/lib/sift_client/resources/jobs.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio +import time from typing import TYPE_CHECKING from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient @@ -155,3 +157,35 @@ async def retry(self, job: Job | str) -> Job: job_id = job._id_or_error if isinstance(job, Job) else job updated_job = await self._low_level_client.retry_job(job_id) return self._apply_client_to_instance(updated_job) + + async def wait_until_complete( + self, + *, + job: Job | str, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + ) -> Job: + """Wait until the job is complete or the timeout is reached. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, returning the completed Job + + Args: + job: The Job or job_id to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5s. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + Defaults to None (indefinite). + + Returns: + The Job in the completed state. + """ + job_id = job._id_or_error if isinstance(job, Job) else job + + start = time.monotonic() + while True: + job = await self.get(job_id) + if job.job_status in (JobStatus.FINISHED, JobStatus.FAILED, JobStatus.CANCELLED): + return job + if timeout_secs is not None and (time.monotonic() - start) >= timeout_secs: + raise TimeoutError(f"Job {job_id} did not complete within {timeout_secs} seconds") + await asyncio.sleep(polling_interval_secs) diff --git a/python/lib/sift_client/resources/reports.py b/python/lib/sift_client/resources/reports.py index df3b4f0d1..37e69fd5d 100644 --- a/python/lib/sift_client/resources/reports.py +++ b/python/lib/sift_client/resources/reports.py @@ -5,6 +5,7 @@ from sift_client._internal.low_level_wrappers.reports import ReportsLowLevelClient from sift_client._internal.low_level_wrappers.rules import RulesLowLevelClient from sift_client.resources._base import ResourceBase +from sift_client.sift_types.job import Job, RuleEvaluationDetails from sift_client.sift_types.report import Report, ReportUpdate from sift_client.sift_types.rule import Rule, RuleVersion from sift_client.sift_types.run import Run @@ -168,7 +169,7 @@ async def create_from_template( run_id: str, organization_id: str | None = None, name: str | None = None, - ) -> Report | None: + ) -> Job | None: """Create a new report from a report template. Args: @@ -178,21 +179,15 @@ async def create_from_template( name: Optional name for the report. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ - ( - created_annotation_count, - created_report, - job_id, - ) = await self._rules_low_level_client.evaluate_rules( + _annotation_count, _report_id, job = await self._rules_low_level_client.evaluate_rules( report_template_id=report_template_id, run_id=run_id, organization_id=organization_id, report_name=name, ) - if created_report: - return self._apply_client_to_instance(created_report) - return None + return self._apply_client_to_instance(job) if job is not None else None async def create_from_rules( self, @@ -201,7 +196,7 @@ async def create_from_rules( run: Run | str | None = None, organization_id: str | None = None, rules: list[Rule] | list[str], - ) -> Report | None: + ) -> Job | None: """Create a new report from rules. Args: @@ -211,22 +206,16 @@ async def create_from_rules( rules: List of rules or rule IDs to include in the report. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ - ( - created_annotation_count, - created_report, - job_id, - ) = await self._rules_low_level_client.evaluate_rules( + _annotation_count, _report_id, job = await self._rules_low_level_client.evaluate_rules( run_id=run._id_or_error if isinstance(run, Run) else run, organization_id=organization_id, rule_ids=[rule._id_or_error if isinstance(rule, Rule) else rule for rule in rules] or [], report_name=name, ) - if created_report: - return self._apply_client_to_instance(created_report) - return None + return self._apply_client_to_instance(job) if job is not None else None async def create_from_applicable_rules( self, @@ -236,7 +225,7 @@ async def create_from_applicable_rules( name: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, - ) -> Report | None: + ) -> Job | None: """Create a new report from applicable rules based on a run. If you want to evaluate against assets, use the rules client instead since no report is created in that case. @@ -248,13 +237,9 @@ async def create_from_applicable_rules( end_time: Optional end time to evaluate rules against. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ - ( - created_annotation_count, - created_report, - job_id, - ) = await self._rules_low_level_client.evaluate_rules( + _annotation_count, _report_id, job = await self._rules_low_level_client.evaluate_rules( run_id=run._id_or_error if isinstance(run, Run) else run, organization_id=organization_id, start_time=start_time, @@ -262,9 +247,7 @@ async def create_from_applicable_rules( report_name=name, all_applicable_rules=True, ) - if created_report: - return self._apply_client_to_instance(created_report) - return None + return self._apply_client_to_instance(job) if job is not None else None async def create_from_rule_versions( self, @@ -273,7 +256,7 @@ async def create_from_rule_versions( run: Run | str | None = None, organization_id: str | None = None, rule_versions: list[RuleVersion] | list[str], - ) -> Report | None: + ) -> Job | None: """Create a new report from rule versions. Args: @@ -283,12 +266,12 @@ async def create_from_rule_versions( rule_versions: List of RuleVersions or rule_version IDs to include in the report. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ ( - created_annotation_count, - created_report, - job_id, + _annotation_count, + _report_id, + job, ) = await self._rules_low_level_client.evaluate_rules( run_id=run._id_or_error if isinstance(run, Run) else run, organization_id=organization_id, @@ -301,27 +284,27 @@ async def create_from_rule_versions( or [], report_name=name, ) - if created_report: - return self._apply_client_to_instance(created_report) - return None + return self._apply_client_to_instance(job) if job is not None else None async def rerun( self, *, report: str | Report, - ) -> tuple[str, str]: + ) -> Job: """Rerun a report. Args: report: The Report or report ID to rerun. Returns: - A tuple of (job_id, new_report_id). + The Job for the new pending report. """ - report_id = report.id_ if isinstance(report, Report) else report - if not isinstance(report_id, str): - raise TypeError(f"report_id must be a string not {type(report_id)}") - return await self._low_level_client.rerun_report(report_id=report_id) + report_id = report._id_or_error if isinstance(report, Report) else report + if not report_id: + raise ValueError("report_id must be provided") + job_id, _new_report_id = await self._low_level_client.rerun_report(report_id=report_id) + job = await self.client.async_.jobs.get(job_id=job_id) + return self._apply_client_to_instance(job) async def cancel( self, @@ -333,9 +316,9 @@ async def cancel( Args: report: The Report or report ID to cancel. """ - report_id = report.id_ if isinstance(report, Report) else report - if not isinstance(report_id, str): - raise TypeError(f"report_id must be a string not {type(report_id)}") + report_id = report._id_or_error if isinstance(report, Report) else report + if not report_id: + raise ValueError("report_id must be provided") await self._low_level_client.cancel_report(report_id=report_id) async def update(self, report: str | Report, update: ReportUpdate | dict) -> Report: @@ -345,7 +328,7 @@ async def update(self, report: str | Report, update: ReportUpdate | dict) -> Rep report: The Report or report ID to update. update: The updates to apply. """ - report_id = report.id_ if isinstance(report, Report) else report + report_id = report._id_or_error if isinstance(report, Report) else report if isinstance(update, dict): update = ReportUpdate.model_validate(update) @@ -359,7 +342,7 @@ async def archive( report: str | Report, ) -> Report: """Archive a report.""" - report_id = report.id_ if isinstance(report, Report) else report + report_id = report._id_or_error if isinstance(report, Report) else report update = ReportUpdate(is_archived=True) update.resource_id = report_id updated_report = await self._low_level_client.update_report(update=update) @@ -371,8 +354,78 @@ async def unarchive( report: str | Report, ) -> Report: """Unarchive a report.""" - report_id = report.id_ if isinstance(report, Report) else report + report_id = report._id_or_error if isinstance(report, Report) else report update = ReportUpdate(is_archived=False) update.resource_id = report_id updated_report = await self._low_level_client.update_report(update=update) return self._apply_client_to_instance(updated_report) + + async def wait_until_complete( + self, + *, + report: Report | str | None = None, + job: Job | str | None = None, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + ) -> Report: + """Wait until the report is complete or the timeout is reached. + + Polls the report job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, returning the completed Report. + + Either a report or job must be provided. The job must be a rule evaluation job. + + Args: + report: The Report or report ID to wait for. + job: The pending rule evaluation Job or job ID to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5s. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + Defaults to None (indefinite). + + Returns: + The Report in the completed state. + + Raises: + ValueError: If both or neither report and job are provided, or if + job is not a rule evaluation job. + """ + if report is not None and job is not None: + raise ValueError("exactly one of report or job must be provided") + if report is None and job is None: + raise ValueError("either report or job must be provided") + + report_id: str | None = None + job_id: str | None = None + if report is not None: + if isinstance(report, str): + report_obj = await self.get(report_id=report) + job_id = report_obj.job_id + report_id = report + else: + job_id = report.job_id + report_id = report.id_ + else: + # job is not None here (we raised if both report and job were None) + assert job is not None + if isinstance(job, str): + job_obj = await self.client.async_.jobs.get(job_id=job) + job_id = job + job_details = job_obj.job_details + else: + job_id = job.id_ + job_details = job.job_details + if not isinstance(job_details, RuleEvaluationDetails): + raise ValueError("job is not a rule evaluation job") + report_id = job_details.report_id + + if not report_id: + raise ValueError("could not retrieve report_id") + if not job_id: + raise ValueError("could not retrieve job_id") + + await self.client.async_.jobs.wait_until_complete( + job=job_id, + polling_interval_secs=polling_interval_secs, + timeout_secs=timeout_secs, + ) + return await self.get(report_id=report_id) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 0fe3a628f..56f49cd7b 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -745,6 +745,25 @@ class JobsAPI: """ ... + def wait_until_complete( + self, *, job: Job | str, polling_interval_secs: int = 5, timeout_secs: int | None = None + ) -> Job: + """Wait until the job is complete or the timeout is reached. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, returning the completed Job + + Args: + job: The Job or job_id to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5s. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + Defaults to None (indefinite). + + Returns: + The Job in the completed state. + """ + ... + class PingAPI: """Sync counterpart to `PingAPIAsync`. @@ -803,7 +822,7 @@ class ReportsAPI: name: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, - ) -> Report | None: + ) -> Job | None: """Create a new report from applicable rules based on a run. If you want to evaluate against assets, use the rules client instead since no report is created in that case. @@ -815,7 +834,7 @@ class ReportsAPI: end_time: Optional end time to evaluate rules against. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ ... @@ -826,7 +845,7 @@ class ReportsAPI: run: Run | str | None = None, organization_id: str | None = None, rule_versions: list[RuleVersion] | list[str], - ) -> Report | None: + ) -> Job | None: """Create a new report from rule versions. Args: @@ -836,7 +855,7 @@ class ReportsAPI: rule_versions: List of RuleVersions or rule_version IDs to include in the report. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ ... @@ -847,7 +866,7 @@ class ReportsAPI: run: Run | str | None = None, organization_id: str | None = None, rules: list[Rule] | list[str], - ) -> Report | None: + ) -> Job | None: """Create a new report from rules. Args: @@ -857,7 +876,7 @@ class ReportsAPI: rules: List of rules or rule IDs to include in the report. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ ... @@ -868,7 +887,7 @@ class ReportsAPI: run_id: str, organization_id: str | None = None, name: str | None = None, - ) -> Report | None: + ) -> Job | None: """Create a new report from a report template. Args: @@ -878,7 +897,7 @@ class ReportsAPI: name: Optional name for the report. Returns: - The created Report or None if no report was created. + The Job for the pending report, or None if no report was created. """ ... @@ -960,14 +979,14 @@ class ReportsAPI: """ ... - def rerun(self, *, report: str | Report) -> tuple[str, str]: + def rerun(self, *, report: str | Report) -> Job: """Rerun a report. Args: report: The Report or report ID to rerun. Returns: - A tuple of (job_id, new_report_id). + The Job for the new pending report. """ ... @@ -984,6 +1003,37 @@ class ReportsAPI: """ ... + def wait_until_complete( + self, + *, + report: Report | str | None = None, + job: Job | str | None = None, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + ) -> Report: + """Wait until the report is complete or the timeout is reached. + + Polls the report job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, returning the completed Report. + + Either a report or job must be provided. The job must be a rule evaluation job. + + Args: + report: The Report or report ID to wait for. + job: The pending rule evaluation Job or job ID to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5s. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + Defaults to None (indefinite). + + Returns: + The Report in the completed state. + + Raises: + ValueError: If both or neither report and job are provided, or if + job is not a rule evaluation job. + """ + ... + class RulesAPI: """Sync counterpart to `RulesAPIAsync`. diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 5b475f83a..32b355763 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -286,3 +286,29 @@ def retry(self) -> Job: updated_job = self.client.jobs.retry(self) self._update(updated_job) return self + + def wait_until_complete( + self, + *, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + ) -> Job: + """Wait until the job is complete or the timeout is reached. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, returning the completed Job + + Args: + job: The Job or job_id to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5s. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + Defaults to None (indefinite). + + Returns: + The Job in the completed state. + """ + completed_job = self.client.jobs.wait_until_complete( + job=self, polling_interval_secs=polling_interval_secs, timeout_secs=timeout_secs + ) + self._update(completed_job) + return self