-
-
Notifications
You must be signed in to change notification settings - Fork 17
Description
Affected version
main
Current and expected behavior
Reported in https://stackable-workspace.slack.com/archives/C02FZ581UCD/p1773388676214269
In
| pub async fn delete_orphaned_resources(self, client: &Client) -> Result<()> { |
This results e.g. in
opa-operator-deployment-554d4dcc7-6x89m 2026-03-13T07:52:38.840175Z DEBUG reconciling object{object.ref=OpaCluster.v1alpha2.opa.stackable.tech/opa.default object.reason=object updated}: kube_client::client: Unsuccessful: Status { status: Some(Failure), code: 403, message: "statefulsets.apps is forbidden: User \"system:serviceaccount:stackable-operators:opa-operator-serviceaccount\" cannot list resource \"statefulsets\" in API group \"apps\" in the namespace \"default\"", metadata: Some(ListMeta { continue_: None, remaining_item_count: None, resource_version: None, self_link: None }), reason: "Forbidden", details: Some(StatusDetails { name: "", group: "apps", kind: "statefulsets", uid: "", causes: [], retry_after_seconds: 0 }) }
or
opa-operator-deployment-554d4dcc7-6x89m 2026-03-13T07:52:38.840206Z DEBUG reconciling object{object.ref=OpaCluster.v1alpha2.opa.stackable.tech/opa.default object.reason=object updated}: stackable_operator::cluster_resources: Skipping deletion of orphaned statefulsets because the operator is not allowed to list them and is therefore probably not in charge of them.
Even worse: As we use a try_join, e.g. failing on StafeulSet means that we potentially don't properly clean up stuff.
This has been this way for years, so I guess not super critical, but also easily preventable with hopefully a stackable-operator bugfix - no operator changes needed.
Possible solution
I quickly prompted this at Claude it produces this patch (untested and stuff). We should bring this over the finish line:
diff --git a/crates/stackable-operator/src/client.rs b/crates/stackable-operator/src/client.rs
index bb9ceb1..5df403a 100644
--- a/crates/stackable-operator/src/client.rs
+++ b/crates/stackable-operator/src/client.rs
@@ -1,12 +1,20 @@
use std::{
+ any::TypeId,
+ collections::HashMap,
convert::TryFrom,
fmt::{Debug, Display},
+ sync::{Arc, RwLock},
+ time::{Duration, Instant},
};
use either::Either;
use futures::StreamExt;
use k8s_openapi::{
- ClusterResourceScope, NamespaceResourceScope, apimachinery::pkg::apis::meta::v1::LabelSelector,
+ ClusterResourceScope, NamespaceResourceScope,
+ api::authorization::v1::{
+ ResourceAttributes, SelfSubjectAccessReview, SelfSubjectAccessReviewSpec,
+ },
+ apimachinery::pkg::apis::meta::v1::LabelSelector,
};
use kube::{
Api, Config,
@@ -103,8 +111,17 @@ pub struct Client {
pub default_namespace: String,
pub kubernetes_cluster_info: KubernetesClusterInfo,
+
+ /// Cache of `SelfSubjectAccessReview` results keyed by (resource type, namespace).
+ list_permissions: Arc<RwLock<HashMap<(TypeId, String), (bool, Instant)>>>,
}
+/// How long a cached `SelfSubjectAccessReview` result is considered valid.
+/// A TTL is used rather than caching indefinitely because RBAC rules can change at runtime
+/// (e.g. an admin updates a `ClusterRole`), and we want to pick up such changes eventually
+/// without requiring an operator restart.
+const LIST_PERMISSION_TTL: Duration = Duration::from_secs(300);
+
impl Client {
pub fn new(
client: KubeClient,
@@ -125,6 +142,7 @@ impl Client {
delete_params: DeleteParams::default(),
default_namespace,
kubernetes_cluster_info,
+ list_permissions: Arc::default(),
}
}
@@ -520,6 +538,67 @@ impl Client {
Api::all(self.client.clone())
}
+ /// Returns whether the current service account is allowed to `list` resources of type `T`
+ /// in the given `namespace`, by performing a [`SelfSubjectAccessReview`].
+ ///
+ /// Results are cached per (resource type, namespace) pair to avoid a SAR API call on every
+ /// reconciliation. The cache has a TTL of [`LIST_PERMISSION_TTL`] so that RBAC changes made
+ /// at runtime are eventually picked up without requiring an operator restart.
+ ///
+ /// If the review request itself fails (e.g. due to a network error), this returns `true` so
+ /// that callers fall back to attempting the operation and handling any resulting error.
+ /// Failures are intentionally not cached: a transient error should not suppress deletion
+ /// for the full TTL duration.
+ pub async fn can_list<T>(&self, namespace: &str) -> bool
+ where
+ T: Resource<DynamicType = ()> + 'static,
+ {
+ let key = (TypeId::of::<T>(), namespace.to_string());
+
+ {
+ let cache = self.list_permissions.read().unwrap();
+ if let Some(&(allowed, cached_at)) = cache.get(&key) {
+ if cached_at.elapsed() < LIST_PERMISSION_TTL {
+ return allowed;
+ }
+ }
+ }
+
+ let sar = SelfSubjectAccessReview {
+ spec: SelfSubjectAccessReviewSpec {
+ resource_attributes: Some(ResourceAttributes {
+ namespace: Some(namespace.to_string()),
+ verb: Some("list".to_string()),
+ group: Some(T::group(&()).to_string()),
+ resource: Some(T::plural(&()).to_string()),
+ ..Default::default()
+ }),
+ ..Default::default()
+ },
+ ..Default::default()
+ };
+
+ let api: Api<SelfSubjectAccessReview> = Api::all(self.client.clone());
+ match api.create(&PostParams::default(), &sar).await {
+ Ok(response) => {
+ let allowed = response.status.map(|s| s.allowed).unwrap_or(false);
+ self.list_permissions
+ .write()
+ .unwrap()
+ .insert(key, (allowed, Instant::now()));
+ allowed
+ }
+ Err(err) => {
+ trace!(
+ ?err,
+ "Failed to perform SelfSubjectAccessReview for {}, assuming list is allowed",
+ T::plural(&())
+ );
+ true
+ }
+ }
+ }
+
#[deprecated(note = "Use Api::get_api instead", since = "0.26.0")]
pub fn get_namespaced_api<T>(&self, namespace: &str) -> Api<T>
where
diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs
index 6bb0474..3ec82e1 100644
--- a/crates/stackable-operator/src/cluster_resources.rs
+++ b/crates/stackable-operator/src/cluster_resources.rs
@@ -105,6 +105,7 @@ pub trait ClusterResource:
+ Resource<DynamicType = (), Scope = NamespaceResourceScope>
+ GetApi<Namespace = str>
+ Serialize
+ + 'static
{
/// This must be implemented for any [`ClusterResources`] that should be adapted before
/// applying depending on the chosen [`ClusterResourceApplyStrategy`].
@@ -720,47 +721,46 @@ impl<'a> ClusterResources<'a> {
return Ok(());
}
- match self.list_deployed_cluster_resources::<T>(client).await {
- Ok(deployed_cluster_resources) => {
- let mut orphaned_resources = Vec::new();
-
- for resource in deployed_cluster_resources {
- let resource_id = resource.uid().context(MissingObjectKeySnafu {
- key: "metadata/uid",
- })?;
- if !self.resource_ids.contains(&resource_id) {
- orphaned_resources.push(resource);
- }
- }
-
- if !orphaned_resources.is_empty() {
- info!(
- "Deleting orphaned {}: {}",
- T::plural(&()),
- ClusterResources::print_resources(&orphaned_resources),
- );
- for resource in orphaned_resources.iter() {
- client
- .delete(resource)
- .await
- .context(DeleteOrphanedResourceSnafu)?;
- }
- }
+ if !client.can_list::<T>(&self.namespace).await {
+ debug!(
+ "Skipping deletion of orphaned {} because the operator is not allowed to list \
+ them and is therefore probably not in charge of them.",
+ T::plural(&())
+ );
+ return Ok(());
+ }
- Ok(())
+ let deployed_cluster_resources = self
+ .list_deployed_cluster_resources::<T>(client)
+ .await
+ .context(ListClusterResourcesSnafu)?;
+
+ let mut orphaned_resources = Vec::new();
+
+ for resource in deployed_cluster_resources {
+ let resource_id = resource.uid().context(MissingObjectKeySnafu {
+ key: "metadata/uid",
+ })?;
+ if !self.resource_ids.contains(&resource_id) {
+ orphaned_resources.push(resource);
}
- Err(crate::client::Error::ListResources {
- source: kube::Error::Api(s),
- }) if s.is_forbidden() => {
- debug!(
- "Skipping deletion of orphaned {} because the operator is not allowed to list \
- them and is therefore probably not in charge of them.",
- T::plural(&())
- );
- Ok(())
+ }
+
+ if !orphaned_resources.is_empty() {
+ info!(
+ "Deleting orphaned {}: {}",
+ T::plural(&()),
+ ClusterResources::print_resources(&orphaned_resources),
+ );
+ for resource in orphaned_resources.iter() {
+ client
+ .delete(resource)
+ .await
+ .context(DeleteOrphanedResourceSnafu)?;
}
- Err(error) => Err(error).context(ListClusterResourcesSnafu),
}
+
+ Ok(())
}
/// Creates a string containing the names and if present namespaces of the given resourcesAdditional context
No response
Environment
No response
Would you like to work on fixing this bug?
maybe