Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions getstream/chat/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1700,3 +1700,46 @@ def send_user_custom_event(
path_params=path_params,
json=json,
)

@telemetry.operation_name("getstream.api.chat.get_retention_policy")
def get_retention_policy(self) -> StreamResponse[GetRetentionPolicyResponse]:
return self.get(
"/api/v2/chat/retention_policy", GetRetentionPolicyResponse
)

@telemetry.operation_name("getstream.api.chat.set_retention_policy")
def set_retention_policy(
self, policy: str, max_age_hours: int
) -> StreamResponse[SetRetentionPolicyResponse]:
json = SetRetentionPolicyRequest(
policy=policy, max_age_hours=max_age_hours
).to_dict()
return self.post(
"/api/v2/chat/retention_policy",
SetRetentionPolicyResponse,
json=json,
)

@telemetry.operation_name("getstream.api.chat.delete_retention_policy")
def delete_retention_policy(
self, policy: str
) -> StreamResponse[DeleteRetentionPolicyResponse]:
json = DeleteRetentionPolicyRequest(policy=policy).to_dict()
return self.post(
"/api/v2/chat/retention_policy/delete",
DeleteRetentionPolicyResponse,
json=json,
)

@telemetry.operation_name("getstream.api.chat.get_retention_policy_runs")
def get_retention_policy_runs(
self,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> StreamResponse[GetRetentionPolicyRunsResponse]:
query_params = build_query_param(limit=limit, offset=offset)
return self.get(
"/api/v2/chat/retention_policy/runs",
GetRetentionPolicyRunsResponse,
query_params=query_params,
)
118 changes: 118 additions & 0 deletions getstream/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23537,3 +23537,121 @@ class XiaomiConfigFields(DataClassJsonMixin):
secret: Optional[str] = dc_field(
default=None, metadata=dc_config(field_name="secret")
)


@dataclass
class RetentionPolicyConfig(DataClassJsonMixin):
max_age_hours: int = dc_field(metadata=dc_config(field_name="max_age_hours"))


@dataclass
class RetentionPolicy(DataClassJsonMixin):
app_pk: int = dc_field(metadata=dc_config(field_name="app_pk"))
policy: str = dc_field(metadata=dc_config(field_name="policy"))
config: RetentionPolicyConfig = dc_field(
metadata=dc_config(field_name="config")
)
enabled_at: Optional[datetime] = dc_field(
default=None,
metadata=dc_config(
field_name="enabled_at",
encoder=encode_datetime,
decoder=datetime_from_unix_ns,
),
)


@dataclass
class RetentionCleanupRunStats(DataClassJsonMixin):
channels_deleted: Optional[int] = dc_field(
default=None, metadata=dc_config(field_name="channels_deleted")
)
messages_deleted: Optional[int] = dc_field(
default=None, metadata=dc_config(field_name="messages_deleted")
)


@dataclass
class RetentionCleanupRun(DataClassJsonMixin):
app_pk: int = dc_field(metadata=dc_config(field_name="app_pk"))
policy: str = dc_field(metadata=dc_config(field_name="policy"))
status: str = dc_field(metadata=dc_config(field_name="status"))
stats: RetentionCleanupRunStats = dc_field(
metadata=dc_config(field_name="stats")
)
date: Optional[datetime] = dc_field(
default=None,
metadata=dc_config(
field_name="date",
encoder=encode_datetime,
decoder=datetime_from_unix_ns,
),
)
started_at: Optional[datetime] = dc_field(
default=None,
metadata=dc_config(
field_name="started_at",
encoder=encode_datetime,
decoder=datetime_from_unix_ns,
),
)
finished_at: Optional[datetime] = dc_field(
default=None,
metadata=dc_config(
field_name="finished_at",
encoder=encode_datetime,
decoder=datetime_from_unix_ns,
),
)
cursor_ts: Optional[datetime] = dc_field(
default=None,
metadata=dc_config(
field_name="cursor_ts",
encoder=encode_datetime,
decoder=datetime_from_unix_ns,
),
)
cursor_id: Optional[str] = dc_field(
default=None, metadata=dc_config(field_name="cursor_id")
)
error: Optional[str] = dc_field(
default=None, metadata=dc_config(field_name="error")
)


@dataclass
class SetRetentionPolicyRequest(DataClassJsonMixin):
policy: str = dc_field(metadata=dc_config(field_name="policy"))
max_age_hours: int = dc_field(metadata=dc_config(field_name="max_age_hours"))


@dataclass
class SetRetentionPolicyResponse(DataClassJsonMixin):
duration: str = dc_field(metadata=dc_config(field_name="duration"))
policy: RetentionPolicy = dc_field(metadata=dc_config(field_name="policy"))


@dataclass
class DeleteRetentionPolicyRequest(DataClassJsonMixin):
policy: str = dc_field(metadata=dc_config(field_name="policy"))


@dataclass
class DeleteRetentionPolicyResponse(DataClassJsonMixin):
duration: str = dc_field(metadata=dc_config(field_name="duration"))


@dataclass
class GetRetentionPolicyResponse(DataClassJsonMixin):
duration: str = dc_field(metadata=dc_config(field_name="duration"))
policies: "List[RetentionPolicy]" = dc_field(
metadata=dc_config(field_name="policies")
)


@dataclass
class GetRetentionPolicyRunsResponse(DataClassJsonMixin):
duration: str = dc_field(metadata=dc_config(field_name="duration"))
runs: "List[RetentionCleanupRun]" = dc_field(
metadata=dc_config(field_name="runs")
)
33 changes: 33 additions & 0 deletions tests/test_chat_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,36 @@ def test_event_hooks_sqs_sns(client: Stream):
finally:
# Restore original hooks
client.update_app(event_hooks=original_hooks or [])


def test_get_retention_policy(client: Stream):
"""Create a retention policy, then list all policies."""
try:
client.chat.set_retention_policy(policy="old-messages", max_age_hours=480)
except Exception as e:
if "404" in str(e) or "not found" in str(e).lower():
pytest.skip("Retention policy endpoints not available on this environment")
raise

try:
response = client.chat.get_retention_policy()
assert response.data.policies is not None
policies = [p.policy for p in response.data.policies]
assert "old-messages" in policies
finally:
try:
client.chat.delete_retention_policy(policy="old-messages")
except Exception:
pass


def test_get_retention_policy_runs(client: Stream):
"""Query retention policy run history."""
try:
response = client.chat.get_retention_policy_runs(limit=10, offset=0)
except Exception as e:
if "404" in str(e) or "not found" in str(e).lower():
pytest.skip("Retention policy endpoints not available on this environment")
raise

assert response.data.runs is not None
Loading