Skip to content

Commit 26585c7

Browse files
committed
Add support for JobSet DependsOn with PodsReady
1 parent 930a474 commit 26585c7

File tree

1 file changed

+32
-6
lines changed

1 file changed

+32
-6
lines changed

pkg/controller/jobs/jobset/jobset_controller.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,26 @@ func (j *JobSet) Finished(ctx context.Context) (message string, success, finishe
176176
}
177177

178178
func (j *JobSet) PodsReady(ctx context.Context) bool {
179+
// Get status of jobs
180+
jobsStatus := map[string]jobsetapi.ReplicatedJobStatus{}
181+
for _, replicatedJobStatus := range j.Status.ReplicatedJobsStatus {
182+
jobsStatus[replicatedJobStatus.Name] = replicatedJobStatus
183+
}
179184
var replicas int32
185+
var ready int32
186+
180187
for _, replicatedJob := range j.Spec.ReplicatedJobs {
181-
replicas += replicatedJob.Replicas
182-
}
183-
var readyReplicas int32
184-
for _, replicatedJobStatus := range j.Status.ReplicatedJobsStatus {
185-
readyReplicas += replicatedJobStatus.Ready + replicatedJobStatus.Succeeded
188+
status := jobsStatus[replicatedJob.Name]
189+
// Register the amount of expected replicas from jobs that are supposed to ready,
190+
// which are the ones that do not dependent on other jobs and the ones that depend on
191+
// other jobs but already have the dependency satisfied
192+
if len(replicatedJob.DependsOn) == 0 || (len(replicatedJob.DependsOn) > 0 && isJobDependencySatisfied(replicatedJob, jobsStatus)) {
193+
replicas += replicatedJob.Replicas
194+
}
195+
ready += status.Ready + status.Succeeded
196+
186197
}
187-
return replicas == readyReplicas
198+
return replicas == ready
188199
}
189200

190201
func (j *JobSet) ReclaimablePods(ctx context.Context) ([]kueue.ReclaimablePod, error) {
@@ -243,3 +254,18 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
243254
func GetWorkloadNameForJobSet(jobSetName string, jobSetUID types.UID) string {
244255
return jobframework.GetWorkloadNameForOwnerWithGVK(jobSetName, jobSetUID, gvk)
245256
}
257+
258+
func isJobDependencySatisfied(replicatedJob jobsetapi.ReplicatedJob, jobsStatus map[string]jobsetapi.ReplicatedJobStatus) bool {
259+
for _, dependsOn := range replicatedJob.DependsOn {
260+
if dependsOn.Status == jobsetapi.DependencyReady {
261+
if status, found := jobsStatus[dependsOn.Name]; found && status.Ready == 0 {
262+
return false
263+
}
264+
} else if dependsOn.Status == jobsetapi.DependencyComplete {
265+
if status, found := jobsStatus[dependsOn.Name]; found && status.Succeeded == 0 {
266+
return false
267+
}
268+
}
269+
}
270+
return true
271+
}

0 commit comments

Comments
 (0)