From 0e7d26307f6f6070efa5ef91ef9237a0970f378c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Sun, 20 Oct 2024 11:02:33 +0200 Subject: [PATCH] Revert "use two channel to communicate mesos REVIVE" This reverts commit e99d4195bb234bc0c7c2d078b5ff6a684d9012dd. --- core/task/manager.go | 34 +++++++++++++++++++--------------- core/task/scheduler.go | 32 +++++++++++++++++--------------- core/task/schedulerstate.go | 10 +++++----- 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/core/task/manager.go b/core/task/manager.go index 3ff33f45..95b10a26 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -91,9 +91,8 @@ type Manager struct { tasksToDeploy chan<- *ResourceOffersDeploymentRequest - reviveOffersTrg chan struct{} - reviveOffersDone chan struct{} - cq *controlcommands.CommandQueue + reviveOffersTrg chan struct{} + cq *controlcommands.CommandQueue tasksLaunched int tasksFinished int @@ -142,7 +141,6 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M taskman.cq = taskman.schedulerState.commandqueue taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg - taskman.reviveOffersDone = taskman.schedulerState.reviveOffersDone taskman.ackKilledTasks = newAcks() schedState.setupCli() @@ -158,8 +156,7 @@ func (m *Manager) newTaskForMesosOffer( offer *mesos.Offer, descriptor *Descriptor, localBindMap channel.BindMap, - executorId mesos.ExecutorID, -) (t *Task) { + executorId mesos.ExecutorID) (t *Task) { newId := uid.New().String() t = &Task{ name: fmt.Sprintf("%s#%s", descriptor.TaskClassName, newId), @@ -200,8 +197,8 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass. if err != nil { return } - repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] // get IRepo pointer from RepoManager - if repo == nil { // should never end up here + repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] //get IRepo pointer from RepoManager + if repo == nil { //should never end up here return nil, errors.New("getTaskClassList: repo not found for " + taskClass) } @@ -226,6 +223,7 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass. taskInfo := strings.Split(taskPath, "/tasks/") if len(taskInfo) == 1 { taskFilename = taskInfo[0] + } else { taskFilename = taskInfo[1] } @@ -282,7 +280,7 @@ func (m *Manager) removeInactiveClasses() { return } -func (m *Manager) RemoveReposClasses(repoPath string) { // Currently unused +func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused utils.EnsureTrailingSlash(&repoPath) _ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error { @@ -329,6 +327,7 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) { } func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) { + /* Here's what's gonna happen: 1) check if any tasks are already in Roster, whether they are already locked @@ -517,7 +516,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e timeReviveOffers := time.Now() timeDeployMu := time.Now() m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers - <-m.reviveOffersDone // we only continue when it's done + <-m.reviveOffersTrg // we only continue when it's done utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers", log.WithField("tasksToRun", len(tasksToRun)). WithField("partition", envId)) @@ -598,7 +597,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e // can't lock some of them, so we must roll back and keep them // unlocked in the roster. var deployedTaskIds []string - for taskPtr := range deployedTasks { + for taskPtr, _ := range deployedTasks { taskPtr.SetParent(nil) deployedTaskIds = append(deployedTaskIds, taskPtr.taskId) } @@ -613,11 +612,11 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e } // Finally, we write to the roster. Point of no return! - for taskPtr := range deployedTasks { + for taskPtr, _ := range deployedTasks { m.roster.append(taskPtr) } if deploymentSuccess { - for taskPtr := range deployedTasks { + for taskPtr, _ := range deployedTasks { taskPtr.GetParent().SetTask(taskPtr) } for taskPtr, descriptor := range tasksAlreadyRunning { @@ -630,6 +629,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e } func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error { + taskReleaseErrors := make(map[string]error) taskIdsReleased := make([]string, 0) @@ -686,7 +686,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error { taskPath := task.GetParentRolePath() for inbChName, endpoint := range task.GetLocalBindMap() { var bindMapKey string - if strings.HasPrefix(inbChName, "::") { // global channel alias + if strings.HasPrefix(inbChName, "::") { //global channel alias bindMapKey = inbChName // deduplication @@ -785,6 +785,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error { func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event string, dest string, commonArgs controlcommands.PropertyMap) error { notify := make(chan controlcommands.MesosCommandResponse) receivers, err := tasks.GetMesosCommandTargets() + if err != nil { return err } @@ -869,6 +870,7 @@ func (m *Manager) TriggerHooks(envId uid.ID, tasks Tasks) error { notify := make(chan controlcommands.MesosCommandResponse) receivers, err := tasks.GetMesosCommandTargets() + if err != nil { return err } @@ -933,6 +935,7 @@ func (m *Manager) GetTask(id string) *Task { } func (m *Manager) updateTaskState(taskId string, state string) { + taskPtr := m.roster.getByTaskId(taskId) if taskPtr == nil { log.WithField("taskId", taskId). @@ -986,7 +989,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) { } if ack, ok := m.ackKilledTasks.getValue(taskId); ok { ack <- struct{}{} - // close(ack) // It can even be left open? + //close(ack) // It can even be left open? } return @@ -1027,6 +1030,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) { // Kill all tasks outside an environment (all unlocked tasks) func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) { + toKill := m.roster.filtered(func(t *Task) bool { return !t.IsLocked() }) diff --git a/core/task/scheduler.go b/core/task/scheduler.go index 389b4b81..eeb2103f 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -84,8 +84,7 @@ var schedEventsCh = make(chan scheduler.Event_Type) func runSchedulerController(ctx context.Context, state *schedulerState, - fidStore store.Singleton, -) error { + fidStore store.Singleton) error { // Set up communication from controller to state machine. go func() { for { @@ -104,7 +103,7 @@ func runSchedulerController(ctx context.Context, for { <-state.reviveOffersTrg doReviveOffers(ctx, state) - state.reviveOffersDone <- struct{}{} + state.reviveOffersTrg <- struct{}{} } }() @@ -273,6 +272,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc { // only one entry in the list, we signal back to commandqueue // otherwise, we log and ignore. return func(ctx context.Context, e *scheduler.Event) (err error) { + mesosMessage := e.GetMessage() if mesosMessage == nil { err = errors.New("message handler got bad MESSAGE") @@ -336,7 +336,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc { return } state.taskman.internalEventCh <- ev - // state.handleDeviceEvent(ev) + //state.handleDeviceEvent(ev) } else { log.WithFields(logrus.Fields{ "type": incomingEvent.Type.String(), @@ -437,7 +437,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han timeResourceOffersCall := time.Now() var ( offers = e.GetOffers().GetOffers() - callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds) + callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds) tasksLaunchedThisCycle = 0 offersDeclined = 0 ) @@ -613,7 +613,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han var offerWaitGroup sync.WaitGroup offerWaitGroup.Add(len(offers)) - for offerIndex := range offers { + for offerIndex, _ := range offers { go func(offerIndex int) { defer offerWaitGroup.Done() @@ -1013,6 +1013,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). WithField("offers", len(offers)). WithField("offerHost", offer.Hostname)) + }(offerIndex) // end for offer closure } // end for _, offer := range offers offerWaitGroup.Wait() @@ -1093,7 +1094,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice out := make([]string, len(machines)) i := 0 - for k := range machines { + for k, _ := range machines { out[i] = k i++ } @@ -1183,7 +1184,6 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc { // have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers. // This should generally run when we have received a TASK_FINISHED for some tasks, and we have more // tasks to run. - func (state *schedulerState) tryReviveOffers(ctx context.Context) { // limit the rate at which we request offer revival select { @@ -1274,7 +1274,7 @@ func logAllEvents() eventrules.Rule { } } offerIds := make([]string, len(off)) - for i := range off { + for i, _ := range off { offerIds[i] = off[i].GetID().Value } fields["offerIds"] = strings.Join(offerIds, ",") @@ -1335,6 +1335,7 @@ func makeTaskForMesosResources( descriptorDetector string, offerIDsToDecline map[mesos.OfferID]struct{}, ) (*Task, *mesos.TaskInfo) { + bindMap := make(channel.BindMap) for _, ch := range wants.InboundChannels { if ch.Addressing == channel.IPC { @@ -1367,7 +1368,7 @@ func makeTaskForMesosResources( Attributes: offer.Attributes, Hostname: offer.Hostname, } - state.taskman.AgentCache.Update(agentForCache) // thread safe + state.taskman.AgentCache.Update(agentForCache) //thread safe machinesUsed[offer.Hostname] = struct{}{} taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId) @@ -1565,11 +1566,12 @@ func makeTaskForMesosResources( ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH") mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{} if ok { - mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables, - mesos.Environment_Variable{ - Name: "LD_LIBRARY_PATH", - Value: proto.String(ldLibPath), - }) + mesosTaskInfo.Executor.Command.Environment.Variables = + append(mesosTaskInfo.Executor.Command.Environment.Variables, + mesos.Environment_Variable{ + Name: "LD_LIBRARY_PATH", + Value: proto.String(ldLibPath), + }) } return taskPtr, &mesosTaskInfo diff --git a/core/task/schedulerstate.go b/core/task/schedulerstate.go index 635480b6..9d783b36 100644 --- a/core/task/schedulerstate.go +++ b/core/task/schedulerstate.go @@ -69,9 +69,8 @@ type schedulerState struct { reviveTokens <-chan struct{} tasksToDeploy chan *ResourceOffersDeploymentRequest - reviveOffersTrg chan struct{} - reviveOffersDone chan struct{} - random *rand.Rand + reviveOffersTrg chan struct{} + random *rand.Rand // shouldn't change at runtime, so thread safe: role string @@ -107,6 +106,8 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) ( tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS) + reviveOffersTrg := make(chan struct{}) + state := &schedulerState{ taskman: taskman, fidStore: fidStore, @@ -116,8 +117,7 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) ( viper.GetDuration("mesosReviveWait"), nil), tasksToDeploy: tasksToDeploy, - reviveOffersTrg: make(chan struct{}), - reviveOffersDone: make(chan struct{}), + reviveOffersTrg: reviveOffersTrg, wantsTaskResources: mesos.Resources{}, executor: executorInfo, metricsAPI: metricsAPI,