Skip to content

Orphaned resources cleanup tries to delete resource it doesn't have access to #1176

@sbernauer

Description

@sbernauer

Affected version

main

Current and expected behavior

Reported in https://stackable-workspace.slack.com/archives/C02FZ581UCD/p1773388676214269

In

pub async fn delete_orphaned_resources(self, client: &Client) -> Result<()> {
we simply try to delete all kinds of resources. regardless if we have permissions to do so.
This results e.g. in
opa-operator-deployment-554d4dcc7-6x89m 2026-03-13T07:52:38.840175Z DEBUG reconciling object{object.ref=OpaCluster.v1alpha2.opa.stackable.tech/opa.default object.reason=object updated}: kube_client::client: Unsuccessful: Status { status: Some(Failure), code: 403, message: "statefulsets.apps is forbidden: User \"system:serviceaccount:stackable-operators:opa-operator-serviceaccount\" cannot list resource \"statefulsets\" in API group \"apps\" in the namespace \"default\"", metadata: Some(ListMeta { continue_: None, remaining_item_count: None, resource_version: None, self_link: None }), reason: "Forbidden", details: Some(StatusDetails { name: "", group: "apps", kind: "statefulsets", uid: "", causes: [], retry_after_seconds: 0 }) }

or

opa-operator-deployment-554d4dcc7-6x89m 2026-03-13T07:52:38.840206Z DEBUG reconciling object{object.ref=OpaCluster.v1alpha2.opa.stackable.tech/opa.default object.reason=object updated}: stackable_operator::cluster_resources: Skipping deletion of orphaned statefulsets because the operator is not allowed to list them and is therefore probably not in charge of them.

Even worse: As we use a try_join, e.g. failing on StafeulSet means that we potentially don't properly clean up stuff.

This has been this way for years, so I guess not super critical, but also easily preventable with hopefully a stackable-operator bugfix - no operator changes needed.

Possible solution

I quickly prompted this at Claude it produces this patch (untested and stuff). We should bring this over the finish line:

diff --git a/crates/stackable-operator/src/client.rs b/crates/stackable-operator/src/client.rs
index bb9ceb1..5df403a 100644
--- a/crates/stackable-operator/src/client.rs
+++ b/crates/stackable-operator/src/client.rs
@@ -1,12 +1,20 @@
 use std::{
+    any::TypeId,
+    collections::HashMap,
     convert::TryFrom,
     fmt::{Debug, Display},
+    sync::{Arc, RwLock},
+    time::{Duration, Instant},
 };
 
 use either::Either;
 use futures::StreamExt;
 use k8s_openapi::{
-    ClusterResourceScope, NamespaceResourceScope, apimachinery::pkg::apis::meta::v1::LabelSelector,
+    ClusterResourceScope, NamespaceResourceScope,
+    api::authorization::v1::{
+        ResourceAttributes, SelfSubjectAccessReview, SelfSubjectAccessReviewSpec,
+    },
+    apimachinery::pkg::apis::meta::v1::LabelSelector,
 };
 use kube::{
     Api, Config,
@@ -103,8 +111,17 @@ pub struct Client {
     pub default_namespace: String,
 
     pub kubernetes_cluster_info: KubernetesClusterInfo,
+
+    /// Cache of `SelfSubjectAccessReview` results keyed by (resource type, namespace).
+    list_permissions: Arc<RwLock<HashMap<(TypeId, String), (bool, Instant)>>>,
 }
 
+/// How long a cached `SelfSubjectAccessReview` result is considered valid.
+/// A TTL is used rather than caching indefinitely because RBAC rules can change at runtime
+/// (e.g. an admin updates a `ClusterRole`), and we want to pick up such changes eventually
+/// without requiring an operator restart.
+const LIST_PERMISSION_TTL: Duration = Duration::from_secs(300);
+
 impl Client {
     pub fn new(
         client: KubeClient,
@@ -125,6 +142,7 @@ impl Client {
             delete_params: DeleteParams::default(),
             default_namespace,
             kubernetes_cluster_info,
+            list_permissions: Arc::default(),
         }
     }
 
@@ -520,6 +538,67 @@ impl Client {
         Api::all(self.client.clone())
     }
 
+    /// Returns whether the current service account is allowed to `list` resources of type `T`
+    /// in the given `namespace`, by performing a [`SelfSubjectAccessReview`].
+    ///
+    /// Results are cached per (resource type, namespace) pair to avoid a SAR API call on every
+    /// reconciliation. The cache has a TTL of [`LIST_PERMISSION_TTL`] so that RBAC changes made
+    /// at runtime are eventually picked up without requiring an operator restart.
+    ///
+    /// If the review request itself fails (e.g. due to a network error), this returns `true` so
+    /// that callers fall back to attempting the operation and handling any resulting error.
+    /// Failures are intentionally not cached: a transient error should not suppress deletion
+    /// for the full TTL duration.
+    pub async fn can_list<T>(&self, namespace: &str) -> bool
+    where
+        T: Resource<DynamicType = ()> + 'static,
+    {
+        let key = (TypeId::of::<T>(), namespace.to_string());
+
+        {
+            let cache = self.list_permissions.read().unwrap();
+            if let Some(&(allowed, cached_at)) = cache.get(&key) {
+                if cached_at.elapsed() < LIST_PERMISSION_TTL {
+                    return allowed;
+                }
+            }
+        }
+
+        let sar = SelfSubjectAccessReview {
+            spec: SelfSubjectAccessReviewSpec {
+                resource_attributes: Some(ResourceAttributes {
+                    namespace: Some(namespace.to_string()),
+                    verb: Some("list".to_string()),
+                    group: Some(T::group(&()).to_string()),
+                    resource: Some(T::plural(&()).to_string()),
+                    ..Default::default()
+                }),
+                ..Default::default()
+            },
+            ..Default::default()
+        };
+
+        let api: Api<SelfSubjectAccessReview> = Api::all(self.client.clone());
+        match api.create(&PostParams::default(), &sar).await {
+            Ok(response) => {
+                let allowed = response.status.map(|s| s.allowed).unwrap_or(false);
+                self.list_permissions
+                    .write()
+                    .unwrap()
+                    .insert(key, (allowed, Instant::now()));
+                allowed
+            }
+            Err(err) => {
+                trace!(
+                    ?err,
+                    "Failed to perform SelfSubjectAccessReview for {}, assuming list is allowed",
+                    T::plural(&())
+                );
+                true
+            }
+        }
+    }
+
     #[deprecated(note = "Use Api::get_api instead", since = "0.26.0")]
     pub fn get_namespaced_api<T>(&self, namespace: &str) -> Api<T>
     where
diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs
index 6bb0474..3ec82e1 100644
--- a/crates/stackable-operator/src/cluster_resources.rs
+++ b/crates/stackable-operator/src/cluster_resources.rs
@@ -105,6 +105,7 @@ pub trait ClusterResource:
     + Resource<DynamicType = (), Scope = NamespaceResourceScope>
     + GetApi<Namespace = str>
     + Serialize
+    + 'static
 {
     /// This must be implemented for any [`ClusterResources`] that should be adapted before
     /// applying depending on the chosen [`ClusterResourceApplyStrategy`].
@@ -720,47 +721,46 @@ impl<'a> ClusterResources<'a> {
             return Ok(());
         }
 
-        match self.list_deployed_cluster_resources::<T>(client).await {
-            Ok(deployed_cluster_resources) => {
-                let mut orphaned_resources = Vec::new();
-
-                for resource in deployed_cluster_resources {
-                    let resource_id = resource.uid().context(MissingObjectKeySnafu {
-                        key: "metadata/uid",
-                    })?;
-                    if !self.resource_ids.contains(&resource_id) {
-                        orphaned_resources.push(resource);
-                    }
-                }
-
-                if !orphaned_resources.is_empty() {
-                    info!(
-                        "Deleting orphaned {}: {}",
-                        T::plural(&()),
-                        ClusterResources::print_resources(&orphaned_resources),
-                    );
-                    for resource in orphaned_resources.iter() {
-                        client
-                            .delete(resource)
-                            .await
-                            .context(DeleteOrphanedResourceSnafu)?;
-                    }
-                }
+        if !client.can_list::<T>(&self.namespace).await {
+            debug!(
+                "Skipping deletion of orphaned {} because the operator is not allowed to list \
+                  them and is therefore probably not in charge of them.",
+                T::plural(&())
+            );
+            return Ok(());
+        }
 
-                Ok(())
+        let deployed_cluster_resources = self
+            .list_deployed_cluster_resources::<T>(client)
+            .await
+            .context(ListClusterResourcesSnafu)?;
+
+        let mut orphaned_resources = Vec::new();
+
+        for resource in deployed_cluster_resources {
+            let resource_id = resource.uid().context(MissingObjectKeySnafu {
+                key: "metadata/uid",
+            })?;
+            if !self.resource_ids.contains(&resource_id) {
+                orphaned_resources.push(resource);
             }
-            Err(crate::client::Error::ListResources {
-                source: kube::Error::Api(s),
-            }) if s.is_forbidden() => {
-                debug!(
-                    "Skipping deletion of orphaned {} because the operator is not allowed to list \
-                      them and is therefore probably not in charge of them.",
-                    T::plural(&())
-                );
-                Ok(())
+        }
+
+        if !orphaned_resources.is_empty() {
+            info!(
+                "Deleting orphaned {}: {}",
+                T::plural(&()),
+                ClusterResources::print_resources(&orphaned_resources),
+            );
+            for resource in orphaned_resources.iter() {
+                client
+                    .delete(resource)
+                    .await
+                    .context(DeleteOrphanedResourceSnafu)?;
             }
-            Err(error) => Err(error).context(ListClusterResourcesSnafu),
         }
+
+        Ok(())
     }
 
     /// Creates a string containing the names and if present namespaces of the given resources

Additional context

No response

Environment

No response

Would you like to work on fixing this bug?

maybe

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions