Skip to content

Commit f51fd24

Browse files
authored
Automated cherry pick of #6764: Order candidates for FS preemption using less function used to sort workloads (#6800)
* Order candidates for FS preemption using less function used to sort workloads * Move CandidatesOrdering func to commong package * Remove ginkgo focus in integration tests
1 parent 87e6238 commit f51fd24

File tree

5 files changed

+149
-54
lines changed

5 files changed

+149
-54
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package preemptioncommon
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/apimachinery/pkg/api/meta"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
25+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
26+
"sigs.k8s.io/kueue/pkg/util/priority"
27+
"sigs.k8s.io/kueue/pkg/workload"
28+
)
29+
30+
// candidatesOrdering criteria:
31+
// 0. Workloads already marked for preemption first.
32+
// 1. Workloads from other ClusterQueues in the cohort before the ones in the
33+
// same ClusterQueue as the preemptor.
34+
// 2. Workloads with lower priority first.
35+
// 3. Workloads admitted more recently first.
36+
func CandidatesOrdering(candidates []*workload.Info, cq kueue.ClusterQueueReference, now time.Time) func(int, int) bool {
37+
return func(i, j int) bool {
38+
a := candidates[i]
39+
b := candidates[j]
40+
aEvicted := meta.IsStatusConditionTrue(a.Obj.Status.Conditions, kueue.WorkloadEvicted)
41+
bEvicted := meta.IsStatusConditionTrue(b.Obj.Status.Conditions, kueue.WorkloadEvicted)
42+
if aEvicted != bEvicted {
43+
return aEvicted
44+
}
45+
aInCQ := a.ClusterQueue == cq
46+
bInCQ := b.ClusterQueue == cq
47+
if aInCQ != bInCQ {
48+
return !aInCQ
49+
}
50+
pa := priority.Priority(a.Obj)
51+
pb := priority.Priority(b.Obj)
52+
if pa != pb {
53+
return pa < pb
54+
}
55+
timeA := quotaReservationTime(a.Obj, now)
56+
timeB := quotaReservationTime(b.Obj, now)
57+
if !timeA.Equal(timeB) {
58+
return timeA.After(timeB)
59+
}
60+
// Arbitrary comparison for deterministic sorting.
61+
return a.Obj.UID < b.Obj.UID
62+
}
63+
}
64+
65+
func quotaReservationTime(wl *kueue.Workload, now time.Time) time.Time {
66+
cond := meta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
67+
if cond == nil || cond.Status != metav1.ConditionTrue {
68+
// The condition wasn't populated yet, use the current time.
69+
return now
70+
}
71+
return cond.LastTransitionTime.Time
72+
}

pkg/scheduler/preemption/fairsharing/ordering.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@ package fairsharing
1818

1919
import (
2020
"iter"
21+
"sort"
22+
"time"
2123

24+
"github.com/go-logr/logr"
2225
"k8s.io/apimachinery/pkg/util/sets"
2326

2427
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
2528
"sigs.k8s.io/kueue/pkg/cache"
29+
preemptioncommon "sigs.k8s.io/kueue/pkg/scheduler/preemption/common"
2630
"sigs.k8s.io/kueue/pkg/workload"
2731
)
2832

@@ -54,9 +58,10 @@ type TargetClusterQueueOrdering struct {
5458
// preemption target candidates.
5559
prunedClusterQueues sets.Set[*cache.ClusterQueueSnapshot]
5660
prunedCohorts sets.Set[*cache.CohortSnapshot]
61+
log logr.Logger
5762
}
5863

59-
func MakeClusterQueueOrdering(cq *cache.ClusterQueueSnapshot, candidates []*workload.Info) TargetClusterQueueOrdering {
64+
func MakeClusterQueueOrdering(cq *cache.ClusterQueueSnapshot, candidates []*workload.Info, log logr.Logger) TargetClusterQueueOrdering {
6065
t := TargetClusterQueueOrdering{
6166
preemptorCq: cq,
6267
preemptorAncestors: sets.New[*cache.CohortSnapshot](),
@@ -65,6 +70,7 @@ func MakeClusterQueueOrdering(cq *cache.ClusterQueueSnapshot, candidates []*work
6570

6671
prunedClusterQueues: sets.New[*cache.ClusterQueueSnapshot](),
6772
prunedCohorts: sets.New[*cache.CohortSnapshot](),
73+
log: log,
6874
}
6975

7076
for ancestor := range cq.PathParentToRoot() {
@@ -141,9 +147,18 @@ func (t *TargetClusterQueueOrdering) nextTarget(cohort *cache.CohortSnapshot) *T
141147
drs := cq.DominantResourceShare()
142148
// we can't prune the preemptor ClusterQueue itself,
143149
// until it runs out of candidates.
144-
if (drs == 0 && cq != t.preemptorCq) || !t.hasWorkload(cq) {
150+
switch {
151+
case (drs == 0 && cq != t.preemptorCq) || !t.hasWorkload(cq):
145152
t.prunedClusterQueues.Insert(cq)
146-
} else if drs >= highestCqDrs {
153+
case drs == highestCqDrs:
154+
newCandWl := t.clusterQueueToTarget[cq.GetName()][0]
155+
currentCandWl := t.clusterQueueToTarget[highestCq.GetName()][0]
156+
candidates := []*workload.Info{newCandWl, currentCandWl}
157+
sort.Slice(candidates, preemptioncommon.CandidatesOrdering(candidates, t.preemptorCq.Name, time.Now()))
158+
if candidates[0] == newCandWl {
159+
highestCq = cq
160+
}
161+
case drs > highestCqDrs:
147162
highestCqDrs = drs
148163
highestCq = cq
149164
}

pkg/scheduler/preemption/preemption.go

Lines changed: 6 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@ import (
2121
"fmt"
2222
"sort"
2323
"sync/atomic"
24-
"time"
2524

2625
"github.com/go-logr/logr"
2726
corev1 "k8s.io/api/core/v1"
2827
"k8s.io/apimachinery/pkg/api/meta"
29-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3028
"k8s.io/apimachinery/pkg/util/sets"
3129
"k8s.io/client-go/tools/record"
3230
"k8s.io/client-go/util/workqueue"
@@ -43,6 +41,7 @@ import (
4341
"sigs.k8s.io/kueue/pkg/resources"
4442
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
4543
"sigs.k8s.io/kueue/pkg/scheduler/preemption/classical"
44+
preemptioncommon "sigs.k8s.io/kueue/pkg/scheduler/preemption/common"
4645
"sigs.k8s.io/kueue/pkg/scheduler/preemption/fairsharing"
4746
"sigs.k8s.io/kueue/pkg/util/priority"
4847
"sigs.k8s.io/kueue/pkg/util/routine"
@@ -208,7 +207,7 @@ func (p *Preemptor) classicalPreemptions(preemptionCtx *preemptionCtx) []*Target
208207
Requests: preemptionCtx.workloadUsage.Quota,
209208
WorkloadOrdering: p.workloadOrdering,
210209
}
211-
candidatesGenerator := classical.NewCandidateIterator(hierarchicalReclaimCtx, preemptionCtx.frsNeedPreemption, preemptionCtx.snapshot, p.clock, CandidatesOrdering)
210+
candidatesGenerator := classical.NewCandidateIterator(hierarchicalReclaimCtx, preemptionCtx.frsNeedPreemption, preemptionCtx.snapshot, p.clock, preemptioncommon.CandidatesOrdering)
212211
var attemptPossibleOpts []preemptionAttemptOpts
213212
borrowWithinCohortForbidden, _ := classical.IsBorrowingWithinCohortForbidden(preemptionCtx.preemptorCQ)
214213
// We have three types of candidates:
@@ -299,7 +298,8 @@ func parseStrategies(s []config.PreemptionStrategy) []fairsharing.Strategy {
299298
// and returns (fits, targets, retryCandidates) retryCandidates may be
300299
// used if rule S2-b is configured.
301300
func runFirstFsStrategy(preemptionCtx *preemptionCtx, candidates []*workload.Info, strategy fairsharing.Strategy) (bool, []*Target, []*workload.Info) {
302-
ordering := fairsharing.MakeClusterQueueOrdering(preemptionCtx.preemptorCQ, candidates)
301+
ordering := fairsharing.MakeClusterQueueOrdering(preemptionCtx.preemptorCQ, candidates, preemptionCtx.log)
302+
303303
var targets []*Target
304304
var retryCandidates []*workload.Info
305305
for candCQ := range ordering.Iter() {
@@ -344,7 +344,7 @@ func runFirstFsStrategy(preemptionCtx *preemptionCtx, candidates []*workload.Inf
344344
// runSecondFsStrategy implements Fair Sharing Rule S2-b. It returns
345345
// (fits, targets).
346346
func runSecondFsStrategy(retryCandidates []*workload.Info, preemptionCtx *preemptionCtx, targets []*Target) (bool, []*Target) {
347-
ordering := fairsharing.MakeClusterQueueOrdering(preemptionCtx.preemptorCQ, retryCandidates)
347+
ordering := fairsharing.MakeClusterQueueOrdering(preemptionCtx.preemptorCQ, retryCandidates, preemptionCtx.log)
348348
for candCQ := range ordering.Iter() {
349349
preemptorNewShare, targetOldShare := candCQ.ComputeShares()
350350
// Due to API validation, we can only reach here if the second strategy is LessThanInitialShare,
@@ -373,7 +373,7 @@ func (p *Preemptor) fairPreemptions(preemptionCtx *preemptionCtx, strategies []f
373373
if len(candidates) == 0 {
374374
return nil
375375
}
376-
sort.Slice(candidates, CandidatesOrdering(candidates, preemptionCtx.preemptorCQ.Name, p.clock.Now()))
376+
sort.Slice(candidates, preemptioncommon.CandidatesOrdering(candidates, preemptionCtx.preemptorCQ.Name, p.clock.Now()))
377377
if logV := preemptionCtx.log.V(5); logV.Enabled() {
378378
logV.Info("Simulating fair preemption", "candidates", workload.References(candidates), "resourcesRequiringPreemption", preemptionCtx.frsNeedPreemption.UnsortedList(), "preemptingWorkload", klog.KObj(preemptionCtx.preemptor.Obj))
379379
}
@@ -504,47 +504,3 @@ func queueUnderNominalInResourcesNeedingPreemption(preemptionCtx *preemptionCtx)
504504
}
505505
return true
506506
}
507-
508-
// candidatesOrdering criteria:
509-
// 0. Workloads already marked for preemption first.
510-
// 1. Workloads from other ClusterQueues in the cohort before the ones in the
511-
// same ClusterQueue as the preemptor.
512-
// 2. Workloads with lower priority first.
513-
// 3. Workloads admitted more recently first.
514-
func CandidatesOrdering(candidates []*workload.Info, cq kueue.ClusterQueueReference, now time.Time) func(int, int) bool {
515-
return func(i, j int) bool {
516-
a := candidates[i]
517-
b := candidates[j]
518-
aEvicted := meta.IsStatusConditionTrue(a.Obj.Status.Conditions, kueue.WorkloadEvicted)
519-
bEvicted := meta.IsStatusConditionTrue(b.Obj.Status.Conditions, kueue.WorkloadEvicted)
520-
if aEvicted != bEvicted {
521-
return aEvicted
522-
}
523-
aInCQ := a.ClusterQueue == cq
524-
bInCQ := b.ClusterQueue == cq
525-
if aInCQ != bInCQ {
526-
return !aInCQ
527-
}
528-
pa := priority.Priority(a.Obj)
529-
pb := priority.Priority(b.Obj)
530-
if pa != pb {
531-
return pa < pb
532-
}
533-
timeA := quotaReservationTime(a.Obj, now)
534-
timeB := quotaReservationTime(b.Obj, now)
535-
if !timeA.Equal(timeB) {
536-
return timeA.After(timeB)
537-
}
538-
// Arbitrary comparison for deterministic sorting.
539-
return a.Obj.UID < b.Obj.UID
540-
}
541-
}
542-
543-
func quotaReservationTime(wl *kueue.Workload, now time.Time) time.Time {
544-
cond := meta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
545-
if cond == nil || cond.Status != metav1.ConditionTrue {
546-
// The condition wasn't populated yet, use the current time.
547-
return now
548-
}
549-
return cond.LastTransitionTime.Time
550-
}

pkg/scheduler/preemption/preemption_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"sigs.k8s.io/kueue/pkg/hierarchy"
4747
"sigs.k8s.io/kueue/pkg/resources"
4848
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
49+
preemptioncommon "sigs.k8s.io/kueue/pkg/scheduler/preemption/common"
4950
"sigs.k8s.io/kueue/pkg/util/slices"
5051
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
5152
"sigs.k8s.io/kueue/pkg/workload"
@@ -2824,7 +2825,7 @@ func TestCandidatesOrdering(t *testing.T) {
28242825
ReserveQuotaAt(utiltesting.MakeAdmission("self").Obj(), now.Add(time.Second)).
28252826
Obj()),
28262827
}
2827-
sort.Slice(candidates, CandidatesOrdering(candidates, "self", now))
2828+
sort.Slice(candidates, preemptioncommon.CandidatesOrdering(candidates, "self", now))
28282829
gotNames := make([]string, len(candidates))
28292830
for i, c := range candidates {
28302831
gotNames[i] = workload.Key(c.Obj)

test/integration/singlecluster/scheduler/fairsharing/fair_sharing_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
3030
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
31+
"sigs.k8s.io/kueue/pkg/features"
3132
"sigs.k8s.io/kueue/pkg/metrics"
3233
"sigs.k8s.io/kueue/pkg/util/testing"
3334
"sigs.k8s.io/kueue/test/integration/framework"
@@ -500,6 +501,56 @@ var _ = ginkgo.Describe("Scheduler", func() {
500501
util.ExpectReservingActiveWorkloadsMetric(cqB, 1)
501502
})
502503
})
504+
505+
ginkgo.When("Preemption is enabled in fairsharing and there are best effort and guaranteed workloads", func() {
506+
var (
507+
bestEffortCQA *kueue.ClusterQueue
508+
bestEffortCQB *kueue.ClusterQueue
509+
)
510+
ginkgo.BeforeEach(func() {
511+
bestEffortCQA = createQueue(testing.MakeClusterQueue("best-effort-a").
512+
Cohort("all").
513+
ResourceGroup(
514+
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "0").Obj(),
515+
).Obj())
516+
517+
bestEffortCQB = createQueue(testing.MakeClusterQueue("best-effort-b").
518+
Cohort("all").
519+
ResourceGroup(
520+
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "0").Obj(),
521+
).Obj())
522+
523+
createQueue(testing.MakeClusterQueue("guaranteed").
524+
Cohort("all").
525+
ResourceGroup(
526+
*testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "8").Obj(),
527+
).Preemption(kueue.ClusterQueuePreemption{
528+
ReclaimWithinCohort: kueue.PreemptionPolicyAny}).
529+
Obj())
530+
531+
features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.AdmissionFairSharing, false)
532+
})
533+
534+
ginkgo.It("Guaranteed workloads cause preemption of a single best effort workload", func() {
535+
ginkgo.By("Creating two best effort workloads in each best effort CQ")
536+
wlBestEffortA := createWorkload("best-effort-a", "4")
537+
util.WaitForNextSecondAfterCreation(wlBestEffortA)
538+
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wlBestEffortA)
539+
util.ExpectReservingActiveWorkloadsMetric(bestEffortCQA, 1)
540+
wlBestEffortB := createWorkload("best-effort-b", "4")
541+
util.ExpectReservingActiveWorkloadsMetric(bestEffortCQB, 1)
542+
543+
ginkgo.By("Creating a guaranteed workload in the guaranteed CQ, that should reclaim quota")
544+
wlGuaranteed := createWorkload("guaranteed", "4")
545+
546+
util.ExpectWorkloadsToBePreempted(ctx, k8sClient, wlBestEffortB)
547+
util.FinishEvictionForWorkloads(ctx, k8sClient, wlBestEffortB)
548+
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wlGuaranteed)
549+
550+
util.ExpectEvictedWorkloadsTotalMetric(bestEffortCQA.Name, kueue.WorkloadEvictedByPreemption, 0)
551+
util.ExpectEvictedWorkloadsTotalMetric(bestEffortCQB.Name, kueue.WorkloadEvictedByPreemption, 1)
552+
})
553+
})
503554
})
504555

505556
func expectCohortWeightedShare(cohortName string, weightedShare int64) {

0 commit comments

Comments
 (0)