diff --git a/core/task/manager.go b/core/task/manager.go index 95b10a26..3ff33f45 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -91,8 +91,9 @@ type Manager struct { tasksToDeploy chan<- *ResourceOffersDeploymentRequest - reviveOffersTrg chan struct{} - cq *controlcommands.CommandQueue + reviveOffersTrg chan struct{} + reviveOffersDone chan struct{} + cq *controlcommands.CommandQueue tasksLaunched int tasksFinished int @@ -141,6 +142,7 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M taskman.cq = taskman.schedulerState.commandqueue taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg + taskman.reviveOffersDone = taskman.schedulerState.reviveOffersDone taskman.ackKilledTasks = newAcks() schedState.setupCli() @@ -156,7 +158,8 @@ func (m *Manager) newTaskForMesosOffer( offer *mesos.Offer, descriptor *Descriptor, localBindMap channel.BindMap, - executorId mesos.ExecutorID) (t *Task) { + executorId mesos.ExecutorID, +) (t *Task) { newId := uid.New().String() t = &Task{ name: fmt.Sprintf("%s#%s", descriptor.TaskClassName, newId), @@ -197,8 +200,8 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass. if err != nil { return } - repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] //get IRepo pointer from RepoManager - if repo == nil { //should never end up here + repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] // get IRepo pointer from RepoManager + if repo == nil { // should never end up here return nil, errors.New("getTaskClassList: repo not found for " + taskClass) } @@ -223,7 +226,6 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass. taskInfo := strings.Split(taskPath, "/tasks/") if len(taskInfo) == 1 { taskFilename = taskInfo[0] - } else { taskFilename = taskInfo[1] } @@ -280,7 +282,7 @@ func (m *Manager) removeInactiveClasses() { return } -func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused +func (m *Manager) RemoveReposClasses(repoPath string) { // Currently unused utils.EnsureTrailingSlash(&repoPath) _ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error { @@ -327,7 +329,6 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) { } func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) { - /* Here's what's gonna happen: 1) check if any tasks are already in Roster, whether they are already locked @@ -516,7 +517,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e timeReviveOffers := time.Now() timeDeployMu := time.Now() m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers - <-m.reviveOffersTrg // we only continue when it's done + <-m.reviveOffersDone // we only continue when it's done utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers", log.WithField("tasksToRun", len(tasksToRun)). WithField("partition", envId)) @@ -597,7 +598,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e // can't lock some of them, so we must roll back and keep them // unlocked in the roster. var deployedTaskIds []string - for taskPtr, _ := range deployedTasks { + for taskPtr := range deployedTasks { taskPtr.SetParent(nil) deployedTaskIds = append(deployedTaskIds, taskPtr.taskId) } @@ -612,11 +613,11 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e } // Finally, we write to the roster. Point of no return! - for taskPtr, _ := range deployedTasks { + for taskPtr := range deployedTasks { m.roster.append(taskPtr) } if deploymentSuccess { - for taskPtr, _ := range deployedTasks { + for taskPtr := range deployedTasks { taskPtr.GetParent().SetTask(taskPtr) } for taskPtr, descriptor := range tasksAlreadyRunning { @@ -629,7 +630,6 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e } func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error { - taskReleaseErrors := make(map[string]error) taskIdsReleased := make([]string, 0) @@ -686,7 +686,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error { taskPath := task.GetParentRolePath() for inbChName, endpoint := range task.GetLocalBindMap() { var bindMapKey string - if strings.HasPrefix(inbChName, "::") { //global channel alias + if strings.HasPrefix(inbChName, "::") { // global channel alias bindMapKey = inbChName // deduplication @@ -785,7 +785,6 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error { func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event string, dest string, commonArgs controlcommands.PropertyMap) error { notify := make(chan controlcommands.MesosCommandResponse) receivers, err := tasks.GetMesosCommandTargets() - if err != nil { return err } @@ -870,7 +869,6 @@ func (m *Manager) TriggerHooks(envId uid.ID, tasks Tasks) error { notify := make(chan controlcommands.MesosCommandResponse) receivers, err := tasks.GetMesosCommandTargets() - if err != nil { return err } @@ -935,7 +933,6 @@ func (m *Manager) GetTask(id string) *Task { } func (m *Manager) updateTaskState(taskId string, state string) { - taskPtr := m.roster.getByTaskId(taskId) if taskPtr == nil { log.WithField("taskId", taskId). @@ -989,7 +986,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) { } if ack, ok := m.ackKilledTasks.getValue(taskId); ok { ack <- struct{}{} - //close(ack) // It can even be left open? + // close(ack) // It can even be left open? } return @@ -1030,7 +1027,6 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) { // Kill all tasks outside an environment (all unlocked tasks) func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) { - toKill := m.roster.filtered(func(t *Task) bool { return !t.IsLocked() }) diff --git a/core/task/scheduler.go b/core/task/scheduler.go index eeb2103f..389b4b81 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -84,7 +84,8 @@ var schedEventsCh = make(chan scheduler.Event_Type) func runSchedulerController(ctx context.Context, state *schedulerState, - fidStore store.Singleton) error { + fidStore store.Singleton, +) error { // Set up communication from controller to state machine. go func() { for { @@ -103,7 +104,7 @@ func runSchedulerController(ctx context.Context, for { <-state.reviveOffersTrg doReviveOffers(ctx, state) - state.reviveOffersTrg <- struct{}{} + state.reviveOffersDone <- struct{}{} } }() @@ -272,7 +273,6 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc { // only one entry in the list, we signal back to commandqueue // otherwise, we log and ignore. return func(ctx context.Context, e *scheduler.Event) (err error) { - mesosMessage := e.GetMessage() if mesosMessage == nil { err = errors.New("message handler got bad MESSAGE") @@ -336,7 +336,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc { return } state.taskman.internalEventCh <- ev - //state.handleDeviceEvent(ev) + // state.handleDeviceEvent(ev) } else { log.WithFields(logrus.Fields{ "type": incomingEvent.Type.String(), @@ -437,7 +437,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han timeResourceOffersCall := time.Now() var ( offers = e.GetOffers().GetOffers() - callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds) + callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds) tasksLaunchedThisCycle = 0 offersDeclined = 0 ) @@ -613,7 +613,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han var offerWaitGroup sync.WaitGroup offerWaitGroup.Add(len(offers)) - for offerIndex, _ := range offers { + for offerIndex := range offers { go func(offerIndex int) { defer offerWaitGroup.Done() @@ -1013,7 +1013,6 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). WithField("offers", len(offers)). WithField("offerHost", offer.Hostname)) - }(offerIndex) // end for offer closure } // end for _, offer := range offers offerWaitGroup.Wait() @@ -1094,7 +1093,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice out := make([]string, len(machines)) i := 0 - for k, _ := range machines { + for k := range machines { out[i] = k i++ } @@ -1184,6 +1183,7 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc { // have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers. // This should generally run when we have received a TASK_FINISHED for some tasks, and we have more // tasks to run. + func (state *schedulerState) tryReviveOffers(ctx context.Context) { // limit the rate at which we request offer revival select { @@ -1274,7 +1274,7 @@ func logAllEvents() eventrules.Rule { } } offerIds := make([]string, len(off)) - for i, _ := range off { + for i := range off { offerIds[i] = off[i].GetID().Value } fields["offerIds"] = strings.Join(offerIds, ",") @@ -1335,7 +1335,6 @@ func makeTaskForMesosResources( descriptorDetector string, offerIDsToDecline map[mesos.OfferID]struct{}, ) (*Task, *mesos.TaskInfo) { - bindMap := make(channel.BindMap) for _, ch := range wants.InboundChannels { if ch.Addressing == channel.IPC { @@ -1368,7 +1367,7 @@ func makeTaskForMesosResources( Attributes: offer.Attributes, Hostname: offer.Hostname, } - state.taskman.AgentCache.Update(agentForCache) //thread safe + state.taskman.AgentCache.Update(agentForCache) // thread safe machinesUsed[offer.Hostname] = struct{}{} taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId) @@ -1566,12 +1565,11 @@ func makeTaskForMesosResources( ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH") mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{} if ok { - mesosTaskInfo.Executor.Command.Environment.Variables = - append(mesosTaskInfo.Executor.Command.Environment.Variables, - mesos.Environment_Variable{ - Name: "LD_LIBRARY_PATH", - Value: proto.String(ldLibPath), - }) + mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables, + mesos.Environment_Variable{ + Name: "LD_LIBRARY_PATH", + Value: proto.String(ldLibPath), + }) } return taskPtr, &mesosTaskInfo diff --git a/core/task/schedulerstate.go b/core/task/schedulerstate.go index 9d783b36..635480b6 100644 --- a/core/task/schedulerstate.go +++ b/core/task/schedulerstate.go @@ -69,8 +69,9 @@ type schedulerState struct { reviveTokens <-chan struct{} tasksToDeploy chan *ResourceOffersDeploymentRequest - reviveOffersTrg chan struct{} - random *rand.Rand + reviveOffersTrg chan struct{} + reviveOffersDone chan struct{} + random *rand.Rand // shouldn't change at runtime, so thread safe: role string @@ -106,8 +107,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) ( tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS) - reviveOffersTrg := make(chan struct{}) - state := &schedulerState{ taskman: taskman, fidStore: fidStore, @@ -117,7 +116,8 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) ( viper.GetDuration("mesosReviveWait"), nil), tasksToDeploy: tasksToDeploy, - reviveOffersTrg: reviveOffersTrg, + reviveOffersTrg: make(chan struct{}), + reviveOffersDone: make(chan struct{}), wantsTaskResources: mesos.Resources{}, executor: executorInfo, metricsAPI: metricsAPI,