Skip to content

Commit

Permalink
[core] Improve handling of Mesos resource offers
Browse files Browse the repository at this point in the history
This commit addresses an issue with timing between a Mesos REVIVE
call and its corresponding OFFERS event.

Specifically:

* The channel that pipes incoming deployment requests into the OFFERS handler is now buffered.
* We retry an unsatisfiable task deployment 3 times before giving up.
* The deployment request is now passed as a pointer.
* The response from the OFFERS handler to task.Manager.acquireTasks now goes through its own channel, one per request.
* The deployment request is now enqueued immediately *before* a REVIVE, as opposed to after, in order to prevent a race with Mesos.
  • Loading branch information
teo committed Jun 28, 2024
1 parent 459a823 commit dd30081
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 100 deletions.
179 changes: 96 additions & 83 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ResourceOffersOutcome struct {
type ResourceOffersDeploymentRequest struct {
tasksToDeploy Descriptors
envId uid.ID
outcomeCh chan ResourceOffersOutcome
}

type Manager struct {
Expand All @@ -88,8 +89,7 @@ type Manager struct {
classes *taskclass.Classes
roster *roster

resourceOffersDone <-chan ResourceOffersOutcome
tasksToDeploy chan<- ResourceOffersDeploymentRequest
tasksToDeploy chan<- *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
cq *controlcommands.CommandQueue
Expand Down Expand Up @@ -132,18 +132,17 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
roster: newRoster(),
internalEventCh: internalEventCh,
}
schedulerState, err := NewScheduler(taskman, fidStore, shutdown)
schedState, err := NewScheduler(taskman, fidStore, shutdown)
if err != nil {
return nil, err
}
taskman.schedulerState = schedulerState
taskman.schedulerState = schedState
taskman.cq = taskman.schedulerState.commandqueue
taskman.resourceOffersDone = taskman.schedulerState.resourceOffersDone
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
taskman.ackKilledTasks = newAcks()

schedulerState.setupCli()
schedState.setupCli()

return
}
Expand Down Expand Up @@ -491,93 +490,107 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e

m.deployMu.Lock()

timeReviveOffers := time.Now()
timeDeployMu := time.Now()
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
<-m.reviveOffersTrg // we only continue when it's done
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))
DEPLOYMENT_ATTEMPTS_LOOP:
for attemptCount := 0; attemptCount < MAX_ATTEMPTS_PER_DEPLOY_REQUEST; attemptCount++ {
// We loop through the deployment attempts until we either succeed or
// reach the maximum number of attempts. In the happy case, we should only
// need to try once. A retry should only be necessary if the Mesos master
// has not been able to provide the resources we need in the offers round
// immediately after reviving.
// We also keep track of the number of attempts made in the request object
// so that the scheduler can decide whether to retry or not.
// The request object is used to pass the tasks to deploy and the outcome
// channel to the deployment routine.

outcomeCh := make(chan ResourceOffersOutcome)
m.tasksToDeploy <- &ResourceOffersDeploymentRequest{
tasksToDeploy: tasksToRun,
envId: envId,
outcomeCh: outcomeCh,
} // buffered channel, does not block

m.tasksToDeploy <- ResourceOffersDeploymentRequest{
tasksToDeploy: tasksToRun,
envId: envId,
} // blocks until received

log.WithField("partition", envId).
Debugf("scheduler has received request to deploy %d tasks", len(tasksToRun))

// IDEA: a flps mesos-role assigned to all mesos agents on flp hosts, and then a static
// reservation for that mesos-role on behalf of our scheduler

roOutcome := <-m.resourceOffersDone

utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))

m.deployMu.Unlock()

deployedTasks = roOutcome.deployed
undeployedDescriptors = roOutcome.undeployed
undeployableDescriptors = roOutcome.undeployable

log.WithField("tasks", deployedTasks).
WithField("partition", envId).
Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks))

if len(deployedTasks) != len(tasksToRun) {
// ↑ Not all roles could be deployed. If some were critical,
// we cannot proceed with running this environment. Either way,
// we keep the roles running since they might be useful in the future.
log.WithField("partition", envId).
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))

for _, desc := range undeployedDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployedCriticalDescriptors = append(undeployedCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment failure: %s", printname)
} else {
undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment failure: %s", printname)
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRun))

timeReviveOffers := time.Now()
timeDeployMu := time.Now()
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
<-m.reviveOffersTrg // we only continue when it's done
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))

roOutcome := <-outcomeCh // blocks until a verdict from resourceOffers comes in

utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))

deployedTasks = roOutcome.deployed
undeployedDescriptors = roOutcome.undeployed
undeployableDescriptors = roOutcome.undeployable

log.WithField("tasks", deployedTasks).
WithField("partition", envId).
Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks))

if len(deployedTasks) != len(tasksToRun) {
// ↑ Not all roles could be deployed. If some were critical,
// we cannot proceed with running this environment. Either way,
// we keep the roles running since they might be useful in the future.
log.WithField("partition", envId).
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))

for _, desc := range undeployedDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployedCriticalDescriptors = append(undeployedCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment failure: %s", printname)
} else {
undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment failure: %s", printname)
}
}
}

for _, desc := range undeployableDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployableCriticalDescriptors = append(undeployableCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment impossible: %s", printname)
go desc.TaskRole.UpdateStatus(UNDEPLOYABLE)
} else {
undeployableNonCriticalDescriptors = append(undeployableNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment impossible: %s", printname)
for _, desc := range undeployableDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployableCriticalDescriptors = append(undeployableCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment impossible: %s", printname)
go desc.TaskRole.UpdateStatus(UNDEPLOYABLE)
} else {
undeployableNonCriticalDescriptors = append(undeployableNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment impossible: %s", printname)
}
}
}
}
}

if deploymentSuccess {
// ↑ means all the required critical processes are now running,
// and we are ready to update the envId
for taskPtr, descriptor := range deployedTasks {
taskPtr.SetParent(descriptor.TaskRole)
// Ensure everything is filled out properly
if !taskPtr.IsLocked() {
log.WithField("task", taskPtr.taskId).Warning("cannot lock newly deployed task")
deploymentSuccess = false
if deploymentSuccess {
// ↑ means all the required critical processes are now running,
// and we are ready to update the envId
for taskPtr, descriptor := range deployedTasks {
taskPtr.SetParent(descriptor.TaskRole)
// Ensure everything is filled out properly
if !taskPtr.IsLocked() {
log.WithField("task", taskPtr.taskId).Warning("cannot lock newly deployed task")
deploymentSuccess = false
}
}
break DEPLOYMENT_ATTEMPTS_LOOP
}
}
}

m.deployMu.Unlock()

if !deploymentSuccess {
// While all the required roles are running, for some reason we
// can't lock some of them, so we must roll back and keep them
Expand Down
32 changes: 22 additions & 10 deletions core/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,14 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
var descriptorsStillToDeploy Descriptors
envId := uid.NilID()

var deploymentRequestPayload *ResourceOffersDeploymentRequest

// receive deployment request from channel, if any
select {
case deploymentRequestPayload := <-state.tasksToDeploy:
case deploymentRequestPayload = <-state.tasksToDeploy:
if deploymentRequestPayload == nil {
break
}
descriptorsStillToDeploy = deploymentRequestPayload.tasksToDeploy
envId = deploymentRequestPayload.envId

Expand Down Expand Up @@ -1051,17 +1057,23 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
}

// Notify listeners...
select {
case state.resourceOffersDone <- ResourceOffersOutcome{tasksDeployed, descriptorsStillToDeploy, descriptorsUndeployable}:
log.WithPrefix("scheduler").
WithField("tasksDeployed", len(tasksDeployed)).
WithField("partition", envId.String()).
Trace("notified listeners on resourceOffers done")
default:
if viper.GetBool("veryVerbose") {
if deploymentRequestPayload != nil {
select {
case deploymentRequestPayload.outcomeCh <- ResourceOffersOutcome{
deployed: tasksDeployed,
undeployed: descriptorsStillToDeploy,
undeployable: descriptorsUndeployable,
}:
log.WithPrefix("scheduler").
WithField("tasksDeployed", len(tasksDeployed)).
WithField("partition", envId.String()).
Trace("no listeners notified")
Trace("notified listeners on resourceOffers done")
default:
if viper.GetBool("veryVerbose") {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
Trace("no listeners notified")
}
}
}

Expand Down
16 changes: 9 additions & 7 deletions core/task/schedulerstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ import (
"github.com/spf13/viper"
)

const (
MAX_CONCURRENT_DEPLOY_REQUESTS = 100
MAX_ATTEMPTS_PER_DEPLOY_REQUEST = 3
)

type schedulerState struct {
sync.RWMutex

Expand All @@ -59,10 +64,9 @@ type schedulerState struct {
err error

// not used in multiple goroutines:
executor *mesos.ExecutorInfo
reviveTokens <-chan struct{}
resourceOffersDone chan ResourceOffersOutcome
tasksToDeploy chan ResourceOffersDeploymentRequest
executor *mesos.ExecutorInfo
reviveTokens <-chan struct{}
tasksToDeploy chan *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
random *rand.Rand
Expand Down Expand Up @@ -100,8 +104,7 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
return nil, err
}

resourceOffersDone := make(chan ResourceOffersOutcome)
tasksToDeploy := make(chan ResourceOffersDeploymentRequest)
tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS)

reviveOffersTrg := make(chan struct{})

Expand All @@ -113,7 +116,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
viper.GetDuration("mesosReviveWait"),
viper.GetDuration("mesosReviveWait"),
nil),
resourceOffersDone: resourceOffersDone,
tasksToDeploy: tasksToDeploy,
reviveOffersTrg: reviveOffersTrg,
wantsTaskResources: mesos.Resources{},
Expand Down

0 comments on commit dd30081

Please sign in to comment.