Skip to content

Commit 5f64777

Browse files
committed
Add support for JobSet DependsOn with PodsReady
When a dependsOn is defined on a JobSet and the waitForPodsReady is configured in Kueue, the PodsReady condition should be udpated only for the Jobs that are started, which is not the case currently, all the jobs, including the ones with dependsOn that are not started are being considered when checking if the respective Pods are ready. This commit fixes the issue by ensuring that whenever there is some updated in the job, it means the job has started to be managed and it should be taken into account when defining if all the Pods are ready.
1 parent a469237 commit 5f64777

File tree

4 files changed

+353
-7
lines changed

4 files changed

+353
-7
lines changed

pkg/controller/jobs/jobset/jobset_controller.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,26 @@ func (j *JobSet) Finished(ctx context.Context) (message string, success, finishe
176176
}
177177

178178
func (j *JobSet) PodsReady(ctx context.Context) bool {
179+
// Get status of jobs
180+
jobsStatus := map[string]jobsetapi.ReplicatedJobStatus{}
181+
for _, replicatedJobStatus := range j.Status.ReplicatedJobsStatus {
182+
jobsStatus[replicatedJobStatus.Name] = replicatedJobStatus
183+
}
184+
179185
var replicas int32
186+
var ready int32
187+
180188
for _, replicatedJob := range j.Spec.ReplicatedJobs {
181-
replicas += replicatedJob.Replicas
182-
}
183-
var readyReplicas int32
184-
for _, replicatedJobStatus := range j.Status.ReplicatedJobsStatus {
185-
readyReplicas += replicatedJobStatus.Ready + replicatedJobStatus.Succeeded
189+
status, ok := jobsStatus[replicatedJob.Name]
190+
// Register the amount of expected replicas from jobs that already has
191+
// some status updated or doesn't have a dependsOn.
192+
if replicatedJob.DependsOn == nil || (ok && status.Active+status.Failed+status.Ready+status.Succeeded+status.Suspended > 0) {
193+
replicas += replicatedJob.Replicas
194+
}
195+
ready += status.Ready + status.Succeeded
186196
}
187-
return replicas == readyReplicas
197+
198+
return replicas == ready
188199
}
189200

190201
func (j *JobSet) ReclaimablePods(ctx context.Context) ([]kueue.ReclaimablePod, error) {

pkg/controller/jobs/jobset/jobset_controller_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,206 @@ func TestPodsReady(t *testing.T) {
101101
).Obj(),
102102
want: false,
103103
},
104+
"job 2 not started, depends on job 1 complete and job 1 is ready": {
105+
jobSet: *testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs(
106+
testingjobset.ReplicatedJobRequirements{
107+
Name: "replicated-job-1",
108+
Replicas: 2,
109+
Parallelism: 1,
110+
Completions: 1,
111+
},
112+
testingjobset.ReplicatedJobRequirements{
113+
Name: "replicated-job-2",
114+
Replicas: 3,
115+
Parallelism: 1,
116+
Completions: 1,
117+
DependsOn: []jobset.DependsOn{
118+
{
119+
Name: "replicated-job-1",
120+
Status: jobset.DependencyComplete,
121+
},
122+
},
123+
},
124+
).JobsStatus(
125+
jobset.ReplicatedJobStatus{
126+
Name: "replicated-job-1",
127+
Ready: 2,
128+
Active: 2,
129+
Succeeded: 0,
130+
Failed: 0,
131+
Suspended: 0,
132+
},
133+
jobset.ReplicatedJobStatus{
134+
Name: "replicated-job-2",
135+
Active: 0,
136+
Ready: 0,
137+
Succeeded: 0,
138+
Failed: 0,
139+
Suspended: 0,
140+
},
141+
).Obj(),
142+
want: true,
143+
},
144+
"job 2 started, but not ready, depends on job 1 complete and job 1 is completed": {
145+
jobSet: *testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs(
146+
testingjobset.ReplicatedJobRequirements{
147+
Name: "replicated-job-1",
148+
Replicas: 2,
149+
Parallelism: 1,
150+
Completions: 1,
151+
},
152+
testingjobset.ReplicatedJobRequirements{
153+
Name: "replicated-job-2",
154+
Replicas: 3,
155+
Parallelism: 1,
156+
Completions: 1,
157+
DependsOn: []jobset.DependsOn{
158+
{
159+
Name: "replicated-job-1",
160+
Status: jobset.DependencyComplete,
161+
},
162+
},
163+
},
164+
).JobsStatus(
165+
jobset.ReplicatedJobStatus{
166+
Name: "replicated-job-1",
167+
Ready: 0,
168+
Active: 0,
169+
Succeeded: 2,
170+
Failed: 0,
171+
Suspended: 0,
172+
},
173+
jobset.ReplicatedJobStatus{
174+
Name: "replicated-job-2",
175+
Active: 3,
176+
Ready: 0,
177+
Succeeded: 0,
178+
Failed: 0,
179+
Suspended: 0,
180+
},
181+
).Obj(),
182+
want: false,
183+
},
184+
"job 2 depends on job 1 ready and job 1 is not ready": {
185+
jobSet: *testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs(
186+
testingjobset.ReplicatedJobRequirements{
187+
Name: "replicated-job-1",
188+
Replicas: 2,
189+
Parallelism: 1,
190+
Completions: 1,
191+
},
192+
testingjobset.ReplicatedJobRequirements{
193+
Name: "replicated-job-2",
194+
Replicas: 3,
195+
Parallelism: 1,
196+
Completions: 1,
197+
DependsOn: []jobset.DependsOn{
198+
{
199+
Name: "replicated-job-1",
200+
Status: jobset.DependencyReady,
201+
},
202+
},
203+
},
204+
).JobsStatus(
205+
jobset.ReplicatedJobStatus{
206+
Name: "replicated-job-1",
207+
Active: 2,
208+
Ready: 0,
209+
Succeeded: 0,
210+
Failed: 0,
211+
Suspended: 0,
212+
},
213+
jobset.ReplicatedJobStatus{
214+
Name: "replicated-job-2",
215+
Active: 0,
216+
Ready: 0,
217+
Succeeded: 0,
218+
Failed: 0,
219+
Suspended: 0,
220+
},
221+
).Obj(),
222+
want: false,
223+
},
224+
"job 2 depends on job 1 ready and job 1 is suspended": {
225+
jobSet: *testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs(
226+
testingjobset.ReplicatedJobRequirements{
227+
Name: "replicated-job-1",
228+
Replicas: 2,
229+
Parallelism: 1,
230+
Completions: 1,
231+
},
232+
testingjobset.ReplicatedJobRequirements{
233+
Name: "replicated-job-2",
234+
Replicas: 3,
235+
Parallelism: 1,
236+
Completions: 1,
237+
DependsOn: []jobset.DependsOn{
238+
{
239+
Name: "replicated-job-1",
240+
Status: jobset.DependencyReady,
241+
},
242+
},
243+
},
244+
).JobsStatus(
245+
jobset.ReplicatedJobStatus{
246+
Name: "replicated-job-1",
247+
Active: 0,
248+
Ready: 0,
249+
Suspended: 2,
250+
Succeeded: 0,
251+
Failed: 0,
252+
},
253+
jobset.ReplicatedJobStatus{
254+
Name: "replicated-job-2",
255+
Active: 0,
256+
Ready: 0,
257+
Succeeded: 0,
258+
Failed: 0,
259+
Suspended: 0,
260+
},
261+
).Obj(),
262+
want: false,
263+
},
264+
"job 2 depends on job 1 ready and job 1 is ready, but job 2 failed": {
265+
jobSet: *testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs(
266+
testingjobset.ReplicatedJobRequirements{
267+
Name: "replicated-job-1",
268+
Replicas: 2,
269+
Parallelism: 1,
270+
Completions: 1,
271+
},
272+
testingjobset.ReplicatedJobRequirements{
273+
Name: "replicated-job-2",
274+
Replicas: 3,
275+
Parallelism: 1,
276+
Completions: 1,
277+
DependsOn: []jobset.DependsOn{
278+
{
279+
Name: "replicated-job-1",
280+
Status: jobset.DependencyReady,
281+
},
282+
},
283+
},
284+
).JobsStatus(
285+
jobset.ReplicatedJobStatus{
286+
Name: "replicated-job-1",
287+
Active: 2,
288+
Ready: 2,
289+
Succeeded: 0,
290+
Failed: 0,
291+
Suspended: 0,
292+
},
293+
jobset.ReplicatedJobStatus{
294+
Name: "replicated-job-2",
295+
Active: 0,
296+
Ready: 0,
297+
Failed: 3,
298+
Succeeded: 0,
299+
Suspended: 0,
300+
},
301+
).Obj(),
302+
want: false,
303+
},
104304
}
105305

106306
for name, tc := range testcases {

pkg/util/testingjobs/jobset/wrappers.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type ReplicatedJobRequirements struct {
5555
PodAnnotations map[string]string
5656
Image string
5757
Args []string
58+
DependsOn []jobsetapi.DependsOn
59+
StartupProbe *corev1.Probe
5860
}
5961

6062
// MakeJobSet creates a wrapper for a suspended JobSet
@@ -96,7 +98,11 @@ func (j *JobSetWrapper) ReplicatedJobs(replicatedJobs ...ReplicatedJobRequiremen
9698
},
9799
}
98100
}
99-
j.Spec.ReplicatedJobs[index] = jobsetutil.MakeReplicatedJob(req.Name).Job(jt).Replicas(req.Replicas).Obj()
101+
if req.StartupProbe != nil {
102+
jt.Spec.Template.Spec.Containers[0].StartupProbe = req.StartupProbe
103+
}
104+
replicatedJob := jobsetutil.MakeReplicatedJob(req.Name).Job(jt).Replicas(req.Replicas).DependsOn(req.DependsOn).Obj()
105+
j.Spec.ReplicatedJobs[index] = replicatedJob
100106
}
101107
return j
102108
}

0 commit comments

Comments
 (0)