Skip to content

Commit

Permalink
Merge pull request #3741 from whitewindmills/schedule-condition
Browse files Browse the repository at this point in the history
feat: optimize scheduling condition semantics
  • Loading branch information
karmada-bot authored Jul 18, 2023
2 parents 8388314 + 45c995a commit 71de164
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 84 deletions.
18 changes: 18 additions & 0 deletions pkg/apis/work/v1alpha2/binding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,24 @@ const (
FullyApplied string = "FullyApplied"
)

// These are reasons for a binding's transition to a Scheduled condition.
const (
// BindingReasonSuccess reason in Scheduled condition means that binding has been scheduled successfully.
BindingReasonSuccess = "Success"

// BindingReasonSchedulerError reason in Scheduled condition means that some internal error happens
// during scheduling, for example due to api-server connection error.
BindingReasonSchedulerError = "SchedulerError"

// BindingReasonNoClusterFit reason in Scheduled condition means that scheduling has finished
// due to no fit cluster.
BindingReasonNoClusterFit = "NoClusterFit"

// BindingReasonUnschedulable reason in Scheduled condition means that the scheduler can't schedule
// the binding right now, for example due to insufficient resources in the clusters.
BindingReasonUnschedulable = "Unschedulable"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ResourceBindingList contains a list of ResourceBinding.
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/core/division_algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
Expand Down Expand Up @@ -48,7 +49,7 @@ func getStaticWeightInfoList(clusters []*clusterv1alpha1.Cluster, weightList []p
// dynamicDivideReplicas assigns a total number of replicas to the selected clusters by preference according to the resource.
func dynamicDivideReplicas(state *assignState) ([]workv1alpha2.TargetCluster, error) {
if state.availableReplicas < state.targetReplicas {
return nil, fmt.Errorf("clusters resources are not enough to schedule, max %d replicas are support", state.availableReplicas)
return nil, &framework.UnschedulableError{Message: fmt.Sprintf("Clusters available replicas %d are not enough to schedule.", state.availableReplicas)}
}

switch state.strategyType {
Expand Down
12 changes: 4 additions & 8 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,9 @@ func (g *genericScheduler) Schedule(
scheduleAlgorithmOption *ScheduleAlgorithmOption,
) (result ScheduleResult, err error) {
clusterInfoSnapshot := g.schedulerCache.Snapshot()
if clusterInfoSnapshot.NumOfClusters() == 0 {
return result, fmt.Errorf("no clusters available to schedule")
}

feasibleClusters, diagnosis, err := g.findClustersThatFit(ctx, spec, status, &clusterInfoSnapshot)
if err != nil {
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
return result, fmt.Errorf("failed to find fit clusters: %w", err)
}

// Short path for case no cluster fit.
Expand All @@ -79,19 +75,19 @@ func (g *genericScheduler) Schedule(

clustersScore, err := g.prioritizeClusters(ctx, g.scheduleFramework, spec, feasibleClusters)
if err != nil {
return result, fmt.Errorf("failed to prioritizeClusters: %v", err)
return result, fmt.Errorf("failed to prioritize clusters: %w", err)
}
klog.V(4).Infof("Feasible clusters scores: %v", clustersScore)

clusters, err := g.selectClusters(clustersScore, spec.Placement, spec)
if err != nil {
return result, fmt.Errorf("failed to select clusters: %v", err)
return result, fmt.Errorf("failed to select clusters: %w", err)
}
klog.V(4).Infof("Selected clusters: %v", clusters)

clustersWithReplicas, err := g.assignReplicas(clusters, spec.Placement, spec)
if err != nil {
return result, fmt.Errorf("failed to assignReplicas: %v", err)
return result, fmt.Errorf("failed to assign replicas: %w", err)
}
if scheduleAlgorithmOption.EnableEmptyWorkloadPropagation {
clustersWithReplicas = attachZeroReplicasCluster(clusters, clustersWithReplicas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *APIEnablement) Filter(
) *framework.Result {
if !helper.IsAPIEnabled(cluster.Status.APIEnablements, bindingSpec.Resource.APIVersion, bindingSpec.Resource.Kind) {
klog.V(2).Infof("Cluster(%s) not fit as missing API(%s, kind=%s)", cluster.Name, bindingSpec.Resource.APIVersion, bindingSpec.Resource.Kind)
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't have the API resource")
return framework.NewResult(framework.Unschedulable, "cluster(s) did not have the API resource")
}

return framework.NewResult(framework.Success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *ClusterAffinity) Filter(
if util.ClusterMatches(cluster, *affinity) {
return framework.NewResult(framework.Success)
}
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't match the placement cluster affinity constraint")
return framework.NewResult(framework.Unschedulable, "cluster(s) did not match the placement cluster affinity constraint")
}

// If no clusters specified and it is not excluded, mark it matched
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func (p *SpreadConstraint) Filter(
) *framework.Result {
for _, spreadConstraint := range bindingSpec.Placement.SpreadConstraints {
if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldProvider && cluster.Spec.Provider == "" {
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't have provider property")
return framework.NewResult(framework.Unschedulable, "cluster(s) did not have provider property")
} else if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldRegion && cluster.Spec.Region == "" {
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't have region property")
return framework.NewResult(framework.Unschedulable, "cluster(s) did not have region property")
} else if spreadConstraint.SpreadByField == policyv1alpha1.SpreadByFieldZone && cluster.Spec.Zone == "" {
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't have zone property")
return framework.NewResult(framework.Unschedulable, "cluster(s) did not have zone property")
}
}

Expand Down
19 changes: 16 additions & 3 deletions pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

const (
// NoClusterAvailableMsg is used to format message when no clusters available.
NoClusterAvailableMsg = "0/%v clusters are available"
// noClusterAvailableMsg is used to format message when no clusters available.
noClusterAvailableMsg = "0/%d clusters are available: %s."
)

// ClusterToResultMap declares map from cluster name to its Result.
Expand Down Expand Up @@ -50,6 +50,10 @@ type FitError struct {

// Error returns detailed information of why the object failed to fit on each cluster
func (f *FitError) Error() string {
if f.NumAllClusters == 0 || len(f.Diagnosis.ClusterToResultMap) == 0 {
return fmt.Sprintf(noClusterAvailableMsg, f.NumAllClusters, "no cluster exists")
}

reasons := make(map[string]int)
for _, result := range f.Diagnosis.ClusterToResultMap {
for _, reason := range result.Reasons() {
Expand All @@ -65,6 +69,15 @@ func (f *FitError) Error() string {
sort.Strings(reasonStrings)
return reasonStrings
}
reasonMsg := fmt.Sprintf(NoClusterAvailableMsg+": %v.", f.NumAllClusters, strings.Join(sortReasonsHistogram(), ", "))
reasonMsg := fmt.Sprintf(noClusterAvailableMsg, f.NumAllClusters, strings.Join(sortReasonsHistogram(), ", "))
return reasonMsg
}

// UnschedulableError describes a unschedulable error, for example due to insufficient resources.
type UnschedulableError struct {
Message string
}

func (u *UnschedulableError) Error() string {
return u.Message
}
36 changes: 36 additions & 0 deletions pkg/scheduler/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package scheduler

import (
"encoding/json"
"errors"
"reflect"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/util"
)

func placementChanged(
Expand Down Expand Up @@ -84,3 +90,33 @@ func getAffinityIndex(affinities []policyv1alpha1.ClusterAffinityTerm, observedN
}
return 0
}

// getConditionByError returns condition by error type, bool to indicate if ignore this error.
func getConditionByError(err error) (metav1.Condition, bool) {
if err == nil {
return util.NewCondition(workv1alpha2.Scheduled, workv1alpha2.BindingReasonSuccess, successfulSchedulingMessage, metav1.ConditionTrue), true
}

var unschedulableErr *framework.UnschedulableError
if errors.As(err, &unschedulableErr) {
return util.NewCondition(workv1alpha2.Scheduled, workv1alpha2.BindingReasonUnschedulable, err.Error(), metav1.ConditionFalse), false
}

fitErrMatcher := func(e error) bool {
var fitErr *framework.FitError
return errors.As(e, &fitErr)
}
if fitErrMatcher(err) {
return util.NewCondition(workv1alpha2.Scheduled, workv1alpha2.BindingReasonNoClusterFit, err.Error(), metav1.ConditionFalse), true
}
var aggregatedErr utilerrors.Aggregate
if errors.As(err, &aggregatedErr) {
for _, ae := range aggregatedErr.Errors() {
if fitErrMatcher(ae) {
// if aggregated NoClusterFit error got, we do not ignore error but retry scheduling.
return util.NewCondition(workv1alpha2.Scheduled, workv1alpha2.BindingReasonNoClusterFit, err.Error(), metav1.ConditionFalse), false
}
}
}
return util.NewCondition(workv1alpha2.Scheduled, workv1alpha2.BindingReasonSchedulerError, err.Error(), metav1.ConditionFalse), false
}
60 changes: 60 additions & 0 deletions pkg/scheduler/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package scheduler

import (
"encoding/json"
"errors"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)

func Test_needConsideredPlacementChanged(t *testing.T) {
Expand Down Expand Up @@ -428,3 +433,58 @@ func Test_getAffinityIndex(t *testing.T) {
})
}
}

func Test_getConditionByError(t *testing.T) {
tests := []struct {
name string
err error
expectedCondition metav1.Condition
ignoreErr bool
}{
{
name: "no error",
err: nil,
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: workv1alpha2.BindingReasonSuccess, Status: metav1.ConditionTrue},
ignoreErr: true,
},
{
name: "failed to schedule",
err: utilerrors.NewAggregate([]error{errors.New("")}),
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: workv1alpha2.BindingReasonSchedulerError, Status: metav1.ConditionFalse},
ignoreErr: false,
},
{
name: "no cluster fit",
err: &framework.FitError{},
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: workv1alpha2.BindingReasonNoClusterFit, Status: metav1.ConditionFalse},
ignoreErr: true,
},
{
name: "aggregated fit error",
err: utilerrors.NewAggregate([]error{&framework.FitError{}, errors.New("")}),
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: workv1alpha2.BindingReasonNoClusterFit, Status: metav1.ConditionFalse},
ignoreErr: false,
},
{
name: "unschedulable error",
err: &framework.UnschedulableError{},
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: workv1alpha2.BindingReasonUnschedulable, Status: metav1.ConditionFalse},
ignoreErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
condition, ignoreErr := getConditionByError(tt.err)
if condition.Type != tt.expectedCondition.Type ||
condition.Reason != tt.expectedCondition.Reason ||
condition.Status != tt.expectedCondition.Status {
t.Errorf("expected condition: (%s, %s, %s), but got (%s, %s, %s)",
tt.expectedCondition.Type, tt.expectedCondition.Reason, tt.expectedCondition.Status, condition.Type, condition.Reason, condition.Status)
}

if ignoreErr != tt.ignoreErr {
t.Errorf("expect to ignore error: %v. but got: %v", tt.ignoreErr, ignoreErr)
}
})
}
}
Loading

0 comments on commit 71de164

Please sign in to comment.