@@ -23,6 +23,7 @@ import (
2323 "github.com/go-logr/logr"
2424 appsv1 "k8s.io/api/apps/v1"
2525 corev1 "k8s.io/api/core/v1"
26+ apierrors "k8s.io/apimachinery/pkg/api/errors"
2627 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728 "k8s.io/apimachinery/pkg/labels"
2829 "k8s.io/apimachinery/pkg/types"
@@ -37,6 +38,7 @@ import (
3738 "sigs.k8s.io/controller-runtime/pkg/predicate"
3839 "sigs.k8s.io/controller-runtime/pkg/reconcile"
3940
41+ kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
4042 "sigs.k8s.io/kueue/pkg/controller/jobframework"
4143 podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod/constants"
4244 clientutil "sigs.k8s.io/kueue/pkg/util/client"
@@ -65,11 +67,63 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
6567 log := ctrl .LoggerFrom (ctx )
6668 log .V (2 ).Info ("Reconcile StatefulSet" )
6769
68- err := r .fetchAndFinalizePods (ctx , req )
70+ sts := & appsv1.StatefulSet {}
71+ if err := r .client .Get (ctx , req .NamespacedName , sts ); err != nil {
72+ return ctrl.Result {}, client .IgnoreNotFound (err )
73+ }
74+
75+ // Ensure the StatefulSet is an owner of the workload to prevent
76+ // Kubernetes GC from deleting it when pods are replaced during rolling updates.
77+ if err := r .ensureWorkloadOwnership (ctx , sts ); err != nil {
78+ return ctrl.Result {}, err
79+ }
80+
81+ err := r .fetchAndFinalizePods (ctx , req , sts )
6982 return ctrl.Result {}, err
7083}
7184
72- func (r * Reconciler ) fetchAndFinalizePods (ctx context.Context , req reconcile.Request ) error {
85+ // ensureWorkloadOwnership ensures the StatefulSet is an owner of the workload.
86+ // This prevents Kubernetes GC from deleting the workload when pods are replaced
87+ // during rolling updates.
88+ func (r * Reconciler ) ensureWorkloadOwnership (ctx context.Context , sts * appsv1.StatefulSet ) error {
89+ log := ctrl .LoggerFrom (ctx )
90+ workloadName := GetWorkloadName (sts .Name )
91+
92+ wl := & kueue.Workload {}
93+ if err := r .client .Get (ctx , types.NamespacedName {Name : workloadName , Namespace : sts .Namespace }, wl ); err != nil {
94+ if apierrors .IsNotFound (err ) {
95+ return nil
96+ }
97+ return err
98+ }
99+
100+ // Check if StatefulSet is already an owner.
101+ for _ , ref := range wl .GetOwnerReferences () {
102+ if ref .UID == sts .UID {
103+ return nil
104+ }
105+ }
106+
107+ // Add StatefulSet as an owner.
108+ if err := controllerutil .SetOwnerReference (sts , wl , r .client .Scheme ()); err != nil {
109+ return err
110+ }
111+
112+ if err := r .client .Update (ctx , wl ); err != nil {
113+ if apierrors .IsConflict (err ) {
114+ return nil
115+ }
116+ return err
117+ }
118+ log .V (3 ).Info ("Added StatefulSet as owner of workload" ,
119+ "statefulset" , klog .KObj (sts ),
120+ "workload" , klog .KObj (wl ),
121+ )
122+
123+ return nil
124+ }
125+
126+ func (r * Reconciler ) fetchAndFinalizePods (ctx context.Context , req reconcile.Request , sts * appsv1.StatefulSet ) error {
73127 podList := & corev1.PodList {}
74128 if err := r .client .List (ctx , podList , client .InNamespace (req .Namespace ), client.MatchingLabels {
75129 podcontroller .GroupNameLabel : GetWorkloadName (req .Name ),
@@ -82,16 +136,6 @@ func (r *Reconciler) fetchAndFinalizePods(ctx context.Context, req reconcile.Req
82136 return nil
83137 }
84138
85- sts := & appsv1.StatefulSet {}
86- err := r .client .Get (ctx , req .NamespacedName , sts )
87- if client .IgnoreNotFound (err ) != nil {
88- return err
89- }
90-
91- if err != nil {
92- sts = nil
93- }
94-
95139 return r .finalizePods (ctx , sts , podList .Items )
96140}
97141
0 commit comments