Skip to content

Commit

Permalink
use two channel to communicate mesos REVIVE
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Tichák committed Oct 16, 2024
1 parent d7ccd5b commit 9d1be98
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 55 deletions.
34 changes: 15 additions & 19 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type Manager struct {

tasksToDeploy chan<- *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
cq *controlcommands.CommandQueue
reviveOffersTrg chan struct{}
reviveOffersDone chan struct{}
cq *controlcommands.CommandQueue

tasksLaunched int
tasksFinished int
Expand Down Expand Up @@ -141,6 +142,7 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
taskman.cq = taskman.schedulerState.commandqueue
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
taskman.reviveOffersDone = taskman.schedulerState.reviveOffersDone
taskman.ackKilledTasks = newAcks()

schedState.setupCli()
Expand All @@ -156,7 +158,8 @@ func (m *Manager) newTaskForMesosOffer(
offer *mesos.Offer,
descriptor *Descriptor,
localBindMap channel.BindMap,
executorId mesos.ExecutorID) (t *Task) {
executorId mesos.ExecutorID,
) (t *Task) {
newId := uid.New().String()
t = &Task{
name: fmt.Sprintf("%s#%s", descriptor.TaskClassName, newId),
Expand Down Expand Up @@ -197,8 +200,8 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
if err != nil {
return
}
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] //get IRepo pointer from RepoManager
if repo == nil { //should never end up here
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] // get IRepo pointer from RepoManager
if repo == nil { // should never end up here
return nil, errors.New("getTaskClassList: repo not found for " + taskClass)
}

Expand All @@ -223,7 +226,6 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
taskInfo := strings.Split(taskPath, "/tasks/")
if len(taskInfo) == 1 {
taskFilename = taskInfo[0]

} else {
taskFilename = taskInfo[1]
}
Expand Down Expand Up @@ -280,7 +282,7 @@ func (m *Manager) removeInactiveClasses() {
return
}

func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused
func (m *Manager) RemoveReposClasses(repoPath string) { // Currently unused
utils.EnsureTrailingSlash(&repoPath)

_ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error {
Expand Down Expand Up @@ -327,7 +329,6 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) {
}

func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) {

/*
Here's what's gonna happen:
1) check if any tasks are already in Roster, whether they are already locked
Expand Down Expand Up @@ -516,7 +517,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
timeReviveOffers := time.Now()
timeDeployMu := time.Now()
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
<-m.reviveOffersTrg // we only continue when it's done
<-m.reviveOffersDone // we only continue when it's done
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))
Expand Down Expand Up @@ -597,7 +598,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
// can't lock some of them, so we must roll back and keep them
// unlocked in the roster.
var deployedTaskIds []string
for taskPtr, _ := range deployedTasks {
for taskPtr := range deployedTasks {
taskPtr.SetParent(nil)
deployedTaskIds = append(deployedTaskIds, taskPtr.taskId)
}
Expand All @@ -612,11 +613,11 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
}

// Finally, we write to the roster. Point of no return!
for taskPtr, _ := range deployedTasks {
for taskPtr := range deployedTasks {
m.roster.append(taskPtr)
}
if deploymentSuccess {
for taskPtr, _ := range deployedTasks {
for taskPtr := range deployedTasks {
taskPtr.GetParent().SetTask(taskPtr)
}
for taskPtr, descriptor := range tasksAlreadyRunning {
Expand All @@ -629,7 +630,6 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
}

func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error {

taskReleaseErrors := make(map[string]error)
taskIdsReleased := make([]string, 0)

Expand Down Expand Up @@ -686,7 +686,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
taskPath := task.GetParentRolePath()
for inbChName, endpoint := range task.GetLocalBindMap() {
var bindMapKey string
if strings.HasPrefix(inbChName, "::") { //global channel alias
if strings.HasPrefix(inbChName, "::") { // global channel alias
bindMapKey = inbChName

// deduplication
Expand Down Expand Up @@ -785,7 +785,6 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event string, dest string, commonArgs controlcommands.PropertyMap) error {
notify := make(chan controlcommands.MesosCommandResponse)
receivers, err := tasks.GetMesosCommandTargets()

if err != nil {
return err
}
Expand Down Expand Up @@ -870,7 +869,6 @@ func (m *Manager) TriggerHooks(envId uid.ID, tasks Tasks) error {

notify := make(chan controlcommands.MesosCommandResponse)
receivers, err := tasks.GetMesosCommandTargets()

if err != nil {
return err
}
Expand Down Expand Up @@ -935,7 +933,6 @@ func (m *Manager) GetTask(id string) *Task {
}

func (m *Manager) updateTaskState(taskId string, state string) {

taskPtr := m.roster.getByTaskId(taskId)
if taskPtr == nil {
log.WithField("taskId", taskId).
Expand Down Expand Up @@ -989,7 +986,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
}
if ack, ok := m.ackKilledTasks.getValue(taskId); ok {
ack <- struct{}{}
//close(ack) // It can even be left open?
// close(ack) // It can even be left open?
}

return
Expand Down Expand Up @@ -1030,7 +1027,6 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {

// Kill all tasks outside an environment (all unlocked tasks)
func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {

toKill := m.roster.filtered(func(t *Task) bool {
return !t.IsLocked()
})
Expand Down
60 changes: 29 additions & 31 deletions core/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ var schedEventsCh = make(chan scheduler.Event_Type)

func runSchedulerController(ctx context.Context,
state *schedulerState,
fidStore store.Singleton) error {
fidStore store.Singleton,
) error {
// Set up communication from controller to state machine.
go func() {
for {
Expand All @@ -103,7 +104,7 @@ func runSchedulerController(ctx context.Context,
for {
<-state.reviveOffersTrg
doReviveOffers(ctx, state)
state.reviveOffersTrg <- struct{}{}
state.reviveOffersDone <- struct{}{}
}
}()

Expand Down Expand Up @@ -272,7 +273,6 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
// only one entry in the list, we signal back to commandqueue
// otherwise, we log and ignore.
return func(ctx context.Context, e *scheduler.Event) (err error) {

mesosMessage := e.GetMessage()
if mesosMessage == nil {
err = errors.New("message handler got bad MESSAGE")
Expand Down Expand Up @@ -336,7 +336,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
return
}
state.taskman.internalEventCh <- ev
//state.handleDeviceEvent(ev)
// state.handleDeviceEvent(ev)
} else {
log.WithFields(logrus.Fields{
"type": incomingEvent.Type.String(),
Expand Down Expand Up @@ -437,7 +437,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
timeResourceOffersCall := time.Now()
var (
offers = e.GetOffers().GetOffers()
callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
tasksLaunchedThisCycle = 0
offersDeclined = 0
)
Expand Down Expand Up @@ -613,7 +613,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
var offerWaitGroup sync.WaitGroup
offerWaitGroup.Add(len(offers))

for offerIndex, _ := range offers {
for offerIndex := range offers {
go func(offerIndex int) {
defer offerWaitGroup.Done()

Expand Down Expand Up @@ -1013,7 +1013,6 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)).
WithField("offers", len(offers)).
WithField("offerHost", offer.Hostname))

}(offerIndex) // end for offer closure
} // end for _, offer := range offers
offerWaitGroup.Wait()
Expand Down Expand Up @@ -1094,7 +1093,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice
out := make([]string, len(machines))
i := 0
for k, _ := range machines {
for k := range machines {
out[i] = k
i++
}
Expand Down Expand Up @@ -1180,20 +1179,21 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc {
}
}

// tryReviveOffers sends a REVIVE call to Mesos. With this we clear all filters we might previously
// have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers.
// This should generally run when we have received a TASK_FINISHED for some tasks, and we have more
// tasks to run.
func (state *schedulerState) tryReviveOffers(ctx context.Context) {
// limit the rate at which we request offer revival
select {
case <-state.reviveTokens:
// not done yet, revive offers!
doReviveOffers(ctx, state)
default:
// noop
}
}
// // tryReviveOffers sends a REVIVE call to Mesos. With this we clear all filters we might previously
// // have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers.
// // This should generally run when we have received a TASK_FINISHED for some tasks, and we have more
// // tasks to run.
//
// func (state *schedulerState) tryReviveOffers(ctx context.Context) {
// // limit the rate at which we request offer revival
// select {
// case <-state.reviveTokens:
// // not done yet, revive offers!
// doReviveOffers(ctx, state)
// default:
// // noop
// }
// }

func doReviveOffers(ctx context.Context, state *schedulerState) {
err := calls.CallNoData(ctx, state.cli, calls.Revive())
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func logAllEvents() eventrules.Rule {
}
}
offerIds := make([]string, len(off))
for i, _ := range off {
for i := range off {
offerIds[i] = off[i].GetID().Value
}
fields["offerIds"] = strings.Join(offerIds, ",")
Expand Down Expand Up @@ -1335,7 +1335,6 @@ func makeTaskForMesosResources(
descriptorDetector string,
offerIDsToDecline map[mesos.OfferID]struct{},
) (*Task, *mesos.TaskInfo) {

bindMap := make(channel.BindMap)
for _, ch := range wants.InboundChannels {
if ch.Addressing == channel.IPC {
Expand Down Expand Up @@ -1368,7 +1367,7 @@ func makeTaskForMesosResources(
Attributes: offer.Attributes,
Hostname: offer.Hostname,
}
state.taskman.AgentCache.Update(agentForCache) //thread safe
state.taskman.AgentCache.Update(agentForCache) // thread safe
machinesUsed[offer.Hostname] = struct{}{}

taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId)
Expand Down Expand Up @@ -1566,12 +1565,11 @@ func makeTaskForMesosResources(
ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH")
mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{}
if ok {
mesosTaskInfo.Executor.Command.Environment.Variables =
append(mesosTaskInfo.Executor.Command.Environment.Variables,
mesos.Environment_Variable{
Name: "LD_LIBRARY_PATH",
Value: proto.String(ldLibPath),
})
mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables,
mesos.Environment_Variable{
Name: "LD_LIBRARY_PATH",
Value: proto.String(ldLibPath),
})
}

return taskPtr, &mesosTaskInfo
Expand Down
10 changes: 5 additions & 5 deletions core/task/schedulerstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ type schedulerState struct {
reviveTokens <-chan struct{}
tasksToDeploy chan *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
random *rand.Rand
reviveOffersTrg chan struct{}
reviveOffersDone chan struct{}
random *rand.Rand

// shouldn't change at runtime, so thread safe:
role string
Expand Down Expand Up @@ -106,8 +107,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (

tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS)

reviveOffersTrg := make(chan struct{})

state := &schedulerState{
taskman: taskman,
fidStore: fidStore,
Expand All @@ -117,7 +116,8 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
viper.GetDuration("mesosReviveWait"),
nil),
tasksToDeploy: tasksToDeploy,
reviveOffersTrg: reviveOffersTrg,
reviveOffersTrg: make(chan struct{}),
reviveOffersDone: make(chan struct{}),
wantsTaskResources: mesos.Resources{},
executor: executorInfo,
metricsAPI: metricsAPI,
Expand Down

0 comments on commit 9d1be98

Please sign in to comment.