Skip to content

Commit

Permalink
Gang min cardinality (#2994)
Browse files Browse the repository at this point in the history
* Wire in min cardinality (#8)

* Wire up minimum gang cardinality

* Wire up minimum gang cardinality

* Wire up minimum gang cardinality

* Wire up minimum gang cardinality

* Bump armada airflow operator to version 0.5.4 (#2961)

* Bump armada airflow operator to version 0.5.4

Signed-off-by: Rich Scott <[email protected]>

* Regenerate Airflow Operator Markdown doc.

Signed-off-by: Rich Scott <[email protected]>

* Fix regenerated Airflow doc error.

Signed-off-by: Rich Scott <[email protected]>

* Pin versions of all modules, especially around docs generation.

Signed-off-by: Rich Scott <[email protected]>

* Regenerate Airflow docs using Python 3.10

Signed-off-by: Rich Scott <[email protected]>

---------

Signed-off-by: Rich Scott <[email protected]>

* Infer failed jobs from job context, tidy up

* Infer failed jobs from job context, tidy up

* Magefile: Clean all Makefile refernces (#2957)

* tiny naming change

* clean all make refernces

Signed-off-by: mohamed <[email protected]>

---------

Signed-off-by: mohamed <[email protected]>

* Infer failed jobs from job context, tidy up

* Revert to previous unpinned airflow version spec. (#2967)

* Revert to previous unpinned airflow version spec.

Signed-off-by: Rich Scott <[email protected]>

* Increment armada-airflow module version.

Signed-off-by: Rich Scott <[email protected]>

---------

Signed-off-by: Rich Scott <[email protected]>

* Only fail gang jobs when the overall gang min cardinality is set. Fix error handling

* Only fail gang jobs when the overall gang min cardinality is set. Fix error handling

* Only fail gang jobs when the overall gang min cardinality is set. Fix error handling

* Update jobdb with any excess gang jobs that failed

* ArmadaContext.Log Improvements (#2965)

* log error

* context log

* context log

* add cycle id

* typo

* lint

* refactor armadacontext to implement a FieldLogger

---------

Co-authored-by: Chris Martin <[email protected]>

* Fix-up existing tests before adding new ones

* Add new tests for minimum gang sizes

* Test that excess failed gang jobs are committed to jobdb

* Run `on.push` only for master (#2968)

* Run On Push only for master

Signed-off-by: mohamed <[email protected]>

* remove not-workflows

Signed-off-by: mohamed <[email protected]>

---------

Signed-off-by: mohamed <[email protected]>

* Add test for failed job pulsar messages

* Tidy tests

* WIP: Airflow: fix undefined poll_interval in Deferrable Operator (#2975)

* Airflow: handle poll_interval attr in ArmadaJobCompleteTrigger

Fix incomplete handling of 'poll_interval' attribute in
ArmadaJobCompleteTrigger, used by the Armada Deferrable Operator for
Airflow.

Signed-off-by: Rich Scott <[email protected]>

* Airflow - add unit test for armada deferrable operator

Run much of the same tests for the deferrable operator as for the
regular operator, plus test serialization.  Also, update interval
signifier in examples. A full test of the deferrable operator that
verifies the trigger handling is still needed.

Signed-off-by: Rich Scott <[email protected]>

---------

Signed-off-by: Rich Scott <[email protected]>

* Release Airflow Operator v0.5.6 (#2979)

Signed-off-by: Rich Scott <[email protected]>

* #2905 - fix indentation (#2971)

Co-authored-by: Mohamed Abdelfatah <[email protected]>
Co-authored-by: Adam McArthur <[email protected]>

Signed-off-by: Rich Scott <[email protected]>
Signed-off-by: mohamed <[email protected]>
Co-authored-by: Rich Scott <[email protected]>
Co-authored-by: Mohamed Abdelfatah <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
Co-authored-by: Dave Gantenbein <[email protected]>
Co-authored-by: Adam McArthur <[email protected]>

* Remove unintentional diffs

* Run formatting tools

* Fix formatting

* Re-run proto gen + dotnet gen

* Validate gang min cardinality is <= gang cardinality

* mage proto

* Tidying

---------

Signed-off-by: Rich Scott <[email protected]>
Signed-off-by: mohamed <[email protected]>
Co-authored-by: Mark Sumner <[email protected]>
Co-authored-by: Rich Scott <[email protected]>
Co-authored-by: Mohamed Abdelfatah <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
Co-authored-by: Dave Gantenbein <[email protected]>
Co-authored-by: Adam McArthur <[email protected]>
Co-authored-by: Albin Severinson <[email protected]>
  • Loading branch information
9 people authored Oct 9, 2023
1 parent 92fec56 commit 8ad1232
Show file tree
Hide file tree
Showing 23 changed files with 1,002 additions and 394 deletions.
38 changes: 30 additions & 8 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type SchedulerResult struct {
PreemptedJobs []interfaces.LegacySchedulerJob
// Queued jobs that should be scheduled.
ScheduledJobs []interfaces.LegacySchedulerJob
// Queued jobs that could not be scheduled.
// This is used to fail jobs that could not schedule above `minimumGangCardinality`.
FailedJobs []interfaces.LegacySchedulerJob
// For each preempted job, maps the job id to the id of the node on which the job was running.
// For each scheduled job, maps the job id to the id of the node on which the job should be scheduled.
NodeIdByJobId map[string]string
Expand All @@ -32,9 +35,10 @@ type SchedulerResult struct {
SchedulingContexts []*schedulercontext.SchedulingContext
}

func NewSchedulerResult[S ~[]T, T interfaces.LegacySchedulerJob](
func NewSchedulerResultForTest[S ~[]T, T interfaces.LegacySchedulerJob](
preemptedJobs S,
scheduledJobs S,
failedJobs S,
nodeIdByJobId map[string]string,
) *SchedulerResult {
castPreemptedJobs := make([]interfaces.LegacySchedulerJob, len(preemptedJobs))
Expand All @@ -45,10 +49,15 @@ func NewSchedulerResult[S ~[]T, T interfaces.LegacySchedulerJob](
for i, job := range scheduledJobs {
castScheduledJobs[i] = job
}
castFailedJobs := make([]interfaces.LegacySchedulerJob, len(failedJobs))
for i, job := range failedJobs {
castFailedJobs[i] = job
}
return &SchedulerResult{
PreemptedJobs: castPreemptedJobs,
ScheduledJobs: castScheduledJobs,
NodeIdByJobId: nodeIdByJobId,
FailedJobs: castFailedJobs,
}
}

Expand All @@ -72,6 +81,16 @@ func ScheduledJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *Sched
return rv
}

// FailedJobsFromScheduleResult returns the slice of scheduled jobs in the result,
// cast to type T.
func FailedJobsFromSchedulerResult[T interfaces.LegacySchedulerJob](sr *SchedulerResult) []T {
rv := make([]T, len(sr.FailedJobs))
for i, job := range sr.FailedJobs {
rv[i] = job.(T)
}
return rv
}

// JobsSummary returns a string giving an overview of the provided jobs meant for logging.
// For example: "affected queues [A, B]; resources {A: {cpu: 1}, B: {cpu: 2}}; jobs [jobAId, jobBId]".
func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
Expand Down Expand Up @@ -132,22 +151,22 @@ func GangIdAndCardinalityFromLegacySchedulerJob(job interfaces.LegacySchedulerJo
// GangIdAndCardinalityFromAnnotations returns a tuple (gangId, gangCardinality, gangMinimumCardinality, isGangJob, error).
func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, int, int, bool, error) {
if annotations == nil {
return "", 0, 0, false, nil
return "", 1, 1, false, nil
}
gangId, ok := annotations[configuration.GangIdAnnotation]
if !ok {
return "", 0, 0, false, nil
return "", 1, 1, false, nil
}
gangCardinalityString, ok := annotations[configuration.GangCardinalityAnnotation]
if !ok {
return "", 0, 0, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation)
return "", 1, 1, false, errors.Errorf("missing annotation %s", configuration.GangCardinalityAnnotation)
}
gangCardinality, err := strconv.Atoi(gangCardinalityString)
if err != nil {
return "", 0, 0, false, errors.WithStack(err)
return "", 1, 1, false, errors.WithStack(err)
}
if gangCardinality <= 0 {
return "", 0, 0, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality)
return "", 1, 1, false, errors.Errorf("gang cardinality is non-positive %d", gangCardinality)
}
gangMinimumCardinalityString, ok := annotations[configuration.GangMinimumCardinalityAnnotation]
if !ok {
Expand All @@ -156,10 +175,13 @@ func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string,
} else {
gangMinimumCardinality, err := strconv.Atoi(gangMinimumCardinalityString)
if err != nil {
return "", 0, 0, false, errors.WithStack(err)
return "", 1, 1, false, errors.WithStack(err)
}
if gangMinimumCardinality <= 0 {
return "", 0, 0, false, errors.Errorf("gang minimum cardinality is non-positive %d", gangMinimumCardinality)
return "", 1, 1, false, errors.Errorf("gang minimum cardinality is non-positive %d", gangMinimumCardinality)
}
if gangMinimumCardinality > gangCardinality {
return "", 1, 1, false, errors.Errorf("gang minimum cardinality %d cannot be greater than gang cardinality %d", gangMinimumCardinality, gangCardinality)
}
return gangId, gangCardinality, gangMinimumCardinality, true, nil
}
Expand Down
47 changes: 31 additions & 16 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,20 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {
func (sctx *SchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingContext) (bool, error) {
allJobsEvictedInThisRound := true
allJobsSuccessful := true
numberOfSuccessfulJobs := 0
for _, jctx := range gctx.JobSchedulingContexts {
evictedInThisRound, err := sctx.AddJobSchedulingContext(jctx)
if err != nil {
return false, err
}
allJobsEvictedInThisRound = allJobsEvictedInThisRound && evictedInThisRound
allJobsSuccessful = allJobsSuccessful && jctx.IsSuccessful()
isSuccess := jctx.IsSuccessful()
allJobsSuccessful = allJobsSuccessful && isSuccess
if isSuccess {
numberOfSuccessfulJobs++
}
}
if allJobsSuccessful && !allJobsEvictedInThisRound {
if numberOfSuccessfulJobs >= gctx.GangMinCardinality && !allJobsEvictedInThisRound {
sctx.NumScheduledGangs++
}
return allJobsEvictedInThisRound, nil
Expand Down Expand Up @@ -458,15 +463,6 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
return sb.String()
}

func (qctx *QueueSchedulingContext) AddGangSchedulingContext(gctx *GangSchedulingContext) error {
for _, jctx := range gctx.JobSchedulingContexts {
if _, err := qctx.AddJobSchedulingContext(jctx); err != nil {
return err
}
}
return nil
}

// AddJobSchedulingContext adds a job scheduling context.
// Automatically updates scheduled resources.
func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContext) (bool, error) {
Expand Down Expand Up @@ -542,6 +538,7 @@ type GangSchedulingContext struct {
TotalResourceRequests schedulerobjects.ResourceList
AllJobsEvicted bool
NodeUniformityLabel string
GangMinCardinality int
}

func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingContext {
Expand All @@ -550,12 +547,14 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
queue := ""
priorityClassName := ""
nodeUniformityLabel := ""
gangMinCardinality := 1
if len(jctxs) > 0 {
queue = jctxs[0].Job.GetQueue()
priorityClassName = jctxs[0].Job.GetPriorityClassName()
if jctxs[0].PodRequirements != nil {
nodeUniformityLabel = jctxs[0].PodRequirements.Annotations[configuration.GangNodeUniformityLabelAnnotation]
}
gangMinCardinality = jctxs[0].GangMinCardinality
}
allJobsEvicted := true
totalResourceRequests := schedulerobjects.NewResourceList(4)
Expand All @@ -571,6 +570,7 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont
TotalResourceRequests: totalResourceRequests,
AllJobsEvicted: allJobsEvicted,
NodeUniformityLabel: nodeUniformityLabel,
GangMinCardinality: gangMinCardinality,
}
}

Expand Down Expand Up @@ -600,6 +600,10 @@ type JobSchedulingContext struct {
UnschedulableReason string
// Pod scheduling contexts for the individual pods that make up the job.
PodSchedulingContext *PodSchedulingContext
// The minimum size of the gang associated with this job.
GangMinCardinality int
// If set, indicates this job should be failed back to the client when the gang is scheduled.
ShouldFail bool
}

func (jctx *JobSchedulingContext) String() string {
Expand All @@ -615,6 +619,7 @@ func (jctx *JobSchedulingContext) String() string {
if jctx.PodSchedulingContext != nil {
fmt.Fprint(w, jctx.PodSchedulingContext.String())
}
fmt.Fprintf(w, "GangMinCardinality:\t%d\n", jctx.GangMinCardinality)
w.Flush()
return sb.String()
}
Expand All @@ -623,15 +628,25 @@ func (jctx *JobSchedulingContext) IsSuccessful() bool {
return jctx.UnschedulableReason == ""
}

func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext {
func JobSchedulingContextsFromJobs[J interfaces.LegacySchedulerJob](priorityClasses map[string]types.PriorityClass, jobs []J, extractGangInfo func(map[string]string) (string, int, int, bool, error)) []*JobSchedulingContext {
jctxs := make([]*JobSchedulingContext, len(jobs))
timestamp := time.Now()

for i, job := range jobs {
// TODO: Move min cardinality to gang context only and remove from here.
// Requires re-phrasing nodedb in terms of gang context, as well as feeding the value extracted from the annotations downstream.
_, _, gangMinCardinality, _, err := extractGangInfo(job.GetAnnotations())
if err != nil {
gangMinCardinality = 1
}

jctxs[i] = &JobSchedulingContext{
Created: timestamp,
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(priorityClasses),
Created: timestamp,
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(priorityClasses),
GangMinCardinality: gangMinCardinality,
ShouldFail: false,
}
}
return jctxs
Expand Down
7 changes: 4 additions & 3 deletions internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ func testNSmallCpuJobSchedulingContext(queue, priorityClassName string, n int) [
func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSchedulingContext {
job := testfixtures.Test1Cpu4GiJob(queue, priorityClassName)
return &JobSchedulingContext{
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(testfixtures.TestPriorityClasses),
JobId: job.GetId(),
Job: job,
PodRequirements: job.GetPodRequirements(testfixtures.TestPriorityClasses),
GangMinCardinality: 1,
}
}
126 changes: 82 additions & 44 deletions internal/scheduler/gang_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,55 @@ func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
sch.skipUnsuccessfulSchedulingKeyCheck = true
}

func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) {
// Exit immediately if this is a new gang and we've exceeded any round limits.
func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *schedulercontext.GangSchedulingContext, gangAddedToSchedulingContext bool, unschedulableReason string) (err error) {
if gangAddedToSchedulingContext {
failedJobs := util.Map(gctx.JobSchedulingContexts, func(jctx *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob { return jctx.Job })
if _, err = sch.schedulingContext.EvictGang(failedJobs); err != nil {
return
}
}

for _, jctx := range gctx.JobSchedulingContexts {
jctx.UnschedulableReason = unschedulableReason
}

if _, err = sch.schedulingContext.AddGangSchedulingContext(gctx); err != nil {
return
}

// Register unfeasible scheduling keys.
//
// Because this check occurs before adding the gctx to the sctx,
// the round limits can be exceeded by one gang.
// Only record unfeasible scheduling keys for single-job gangs.
// Since a gang may be unschedulable even if all its members are individually schedulable.
if !sch.skipUnsuccessfulSchedulingKeyCheck && gctx.Cardinality() == 1 {
jctx := gctx.JobSchedulingContexts[0]
schedulingKey := sch.schedulingContext.SchedulingKeyFromLegacySchedulerJob(jctx.Job)
if _, ok := sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; !ok {
// Keep the first jctx for each unfeasible schedulingKey.
sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey] = jctx
}
}

return
}

func (sch *GangScheduler) updateGangSchedulingContextOnSuccess(gctx *schedulercontext.GangSchedulingContext, gangAddedToSchedulingContext bool) (err error) {
if gangAddedToSchedulingContext {
jobs := util.Map(gctx.JobSchedulingContexts, func(jctx *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob { return jctx.Job })
if _, err = sch.schedulingContext.EvictGang(jobs); err != nil {
return
}
}

if _, err = sch.schedulingContext.AddGangSchedulingContext(gctx); err != nil {
return
}

return
}

func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) {
// Exit immediately if this is a new gang and we've hit any round limits.
if !gctx.AllJobsEvicted {
if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok {
return
Expand All @@ -66,36 +110,15 @@ func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulerco
}
}

// Process unschedulable jobs.
if !ok {
// Register the job as unschedulable. If the job was added to the context, remove it first.
if gangAddedToSchedulingContext {
jobs := util.Map(gctx.JobSchedulingContexts, func(jctx *schedulercontext.JobSchedulingContext) interfaces.LegacySchedulerJob { return jctx.Job })
if _, err = sch.schedulingContext.EvictGang(jobs); err != nil {
return
}
}
for _, jctx := range gctx.JobSchedulingContexts {
jctx.UnschedulableReason = unschedulableReason
}
if _, err = sch.schedulingContext.AddGangSchedulingContext(gctx); err != nil {
return
}

// Register unfeasible scheduling keys.
//
// Only record unfeasible scheduling keys for single-job gangs.
// Since a gang may be unschedulable even if all its members are individually schedulable.
if !sch.skipUnsuccessfulSchedulingKeyCheck && gctx.Cardinality() == 1 {
jctx := gctx.JobSchedulingContexts[0]
schedulingKey := sch.schedulingContext.SchedulingKeyFromLegacySchedulerJob(jctx.Job)
if _, ok := sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; !ok {
// Keep the first jctx for each unfeasible schedulingKey.
sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey] = jctx
}
}
if ok {
err = sch.updateGangSchedulingContextOnSuccess(gctx, gangAddedToSchedulingContext)
} else {
err = sch.updateGangSchedulingContextOnFailure(gctx, gangAddedToSchedulingContext, unschedulableReason)
}

return
}()

if _, err = sch.schedulingContext.AddGangSchedulingContext(gctx); err != nil {
return
}
Expand Down Expand Up @@ -186,23 +209,38 @@ func (sch *GangScheduler) tryScheduleGang(ctx *armadacontext.Context, gctx *sche
return
}

func (sch *GangScheduler) tryScheduleGangWithTxn(ctx *armadacontext.Context, txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) {
if ok, err = sch.nodeDb.ScheduleManyWithTxn(txn, gctx.JobSchedulingContexts); err != nil {
return
} else if !ok {
for _, jctx := range gctx.JobSchedulingContexts {
if jctx.PodSchedulingContext != nil {
// Clear any node bindings on failure to schedule.
jctx.PodSchedulingContext.NodeId = ""
func clearNodeBindings(jctx *schedulercontext.JobSchedulingContext) {
if jctx.PodSchedulingContext != nil {
// Clear any node bindings on failure to schedule.
jctx.PodSchedulingContext.NodeId = ""
}
}

func (sch *GangScheduler) tryScheduleGangWithTxn(_ *armadacontext.Context, txn *memdb.Txn, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) {
if ok, err = sch.nodeDb.ScheduleManyWithTxn(txn, gctx.JobSchedulingContexts); err == nil {
if !ok {
for _, jctx := range gctx.JobSchedulingContexts {
clearNodeBindings(jctx)
}

if gctx.Cardinality() > 1 {
unschedulableReason = "unable to schedule gang since minimum cardinality not met"
} else {
unschedulableReason = "job does not fit on any node"
}
}
if gctx.Cardinality() > 1 {
unschedulableReason = "at least one job in the gang does not fit on any node"
} else {
unschedulableReason = "job does not fit on any node"
// When a gang schedules successfully, update state for failed jobs if they exist.
for _, jctx := range gctx.JobSchedulingContexts {
if jctx.ShouldFail {
clearNodeBindings(jctx)
jctx.UnschedulableReason = "job does not fit on any node"
}
}
}

return
}

return
}

Expand Down
Loading

0 comments on commit 8ad1232

Please sign in to comment.