Skip to content

Commit

Permalink
Revert "use two channel to communicate mesos REVIVE"
Browse files Browse the repository at this point in the history
This reverts commit e99d419.
  • Loading branch information
Michal Tichák committed Oct 20, 2024
1 parent e619a77 commit 0e7d263
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 35 deletions.
34 changes: 19 additions & 15 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ type Manager struct {

tasksToDeploy chan<- *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
reviveOffersDone chan struct{}
cq *controlcommands.CommandQueue
reviveOffersTrg chan struct{}
cq *controlcommands.CommandQueue

tasksLaunched int
tasksFinished int
Expand Down Expand Up @@ -142,7 +141,6 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
taskman.cq = taskman.schedulerState.commandqueue
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
taskman.reviveOffersDone = taskman.schedulerState.reviveOffersDone
taskman.ackKilledTasks = newAcks()

schedState.setupCli()
Expand All @@ -158,8 +156,7 @@ func (m *Manager) newTaskForMesosOffer(
offer *mesos.Offer,
descriptor *Descriptor,
localBindMap channel.BindMap,
executorId mesos.ExecutorID,
) (t *Task) {
executorId mesos.ExecutorID) (t *Task) {
newId := uid.New().String()
t = &Task{
name: fmt.Sprintf("%s#%s", descriptor.TaskClassName, newId),
Expand Down Expand Up @@ -200,8 +197,8 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
if err != nil {
return
}
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] // get IRepo pointer from RepoManager
if repo == nil { // should never end up here
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] //get IRepo pointer from RepoManager
if repo == nil { //should never end up here
return nil, errors.New("getTaskClassList: repo not found for " + taskClass)
}

Expand All @@ -226,6 +223,7 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
taskInfo := strings.Split(taskPath, "/tasks/")
if len(taskInfo) == 1 {
taskFilename = taskInfo[0]

} else {
taskFilename = taskInfo[1]
}
Expand Down Expand Up @@ -282,7 +280,7 @@ func (m *Manager) removeInactiveClasses() {
return
}

func (m *Manager) RemoveReposClasses(repoPath string) { // Currently unused
func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused
utils.EnsureTrailingSlash(&repoPath)

_ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error {
Expand Down Expand Up @@ -329,6 +327,7 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) {
}

func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) {

/*
Here's what's gonna happen:
1) check if any tasks are already in Roster, whether they are already locked
Expand Down Expand Up @@ -517,7 +516,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
timeReviveOffers := time.Now()
timeDeployMu := time.Now()
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
<-m.reviveOffersDone // we only continue when it's done
<-m.reviveOffersTrg // we only continue when it's done
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))
Expand Down Expand Up @@ -598,7 +597,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
// can't lock some of them, so we must roll back and keep them
// unlocked in the roster.
var deployedTaskIds []string
for taskPtr := range deployedTasks {
for taskPtr, _ := range deployedTasks {
taskPtr.SetParent(nil)
deployedTaskIds = append(deployedTaskIds, taskPtr.taskId)
}
Expand All @@ -613,11 +612,11 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
}

// Finally, we write to the roster. Point of no return!
for taskPtr := range deployedTasks {
for taskPtr, _ := range deployedTasks {
m.roster.append(taskPtr)
}
if deploymentSuccess {
for taskPtr := range deployedTasks {
for taskPtr, _ := range deployedTasks {
taskPtr.GetParent().SetTask(taskPtr)
}
for taskPtr, descriptor := range tasksAlreadyRunning {
Expand All @@ -630,6 +629,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
}

func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error {

taskReleaseErrors := make(map[string]error)
taskIdsReleased := make([]string, 0)

Expand Down Expand Up @@ -686,7 +686,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
taskPath := task.GetParentRolePath()
for inbChName, endpoint := range task.GetLocalBindMap() {
var bindMapKey string
if strings.HasPrefix(inbChName, "::") { // global channel alias
if strings.HasPrefix(inbChName, "::") { //global channel alias
bindMapKey = inbChName

// deduplication
Expand Down Expand Up @@ -785,6 +785,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event string, dest string, commonArgs controlcommands.PropertyMap) error {
notify := make(chan controlcommands.MesosCommandResponse)
receivers, err := tasks.GetMesosCommandTargets()

if err != nil {
return err
}
Expand Down Expand Up @@ -869,6 +870,7 @@ func (m *Manager) TriggerHooks(envId uid.ID, tasks Tasks) error {

notify := make(chan controlcommands.MesosCommandResponse)
receivers, err := tasks.GetMesosCommandTargets()

if err != nil {
return err
}
Expand Down Expand Up @@ -933,6 +935,7 @@ func (m *Manager) GetTask(id string) *Task {
}

func (m *Manager) updateTaskState(taskId string, state string) {

taskPtr := m.roster.getByTaskId(taskId)
if taskPtr == nil {
log.WithField("taskId", taskId).
Expand Down Expand Up @@ -986,7 +989,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
}
if ack, ok := m.ackKilledTasks.getValue(taskId); ok {
ack <- struct{}{}
// close(ack) // It can even be left open?
//close(ack) // It can even be left open?
}

return
Expand Down Expand Up @@ -1027,6 +1030,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {

// Kill all tasks outside an environment (all unlocked tasks)
func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {

toKill := m.roster.filtered(func(t *Task) bool {
return !t.IsLocked()
})
Expand Down
32 changes: 17 additions & 15 deletions core/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ var schedEventsCh = make(chan scheduler.Event_Type)

func runSchedulerController(ctx context.Context,
state *schedulerState,
fidStore store.Singleton,
) error {
fidStore store.Singleton) error {
// Set up communication from controller to state machine.
go func() {
for {
Expand All @@ -104,7 +103,7 @@ func runSchedulerController(ctx context.Context,
for {
<-state.reviveOffersTrg
doReviveOffers(ctx, state)
state.reviveOffersDone <- struct{}{}
state.reviveOffersTrg <- struct{}{}
}
}()

Expand Down Expand Up @@ -273,6 +272,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
// only one entry in the list, we signal back to commandqueue
// otherwise, we log and ignore.
return func(ctx context.Context, e *scheduler.Event) (err error) {

mesosMessage := e.GetMessage()
if mesosMessage == nil {
err = errors.New("message handler got bad MESSAGE")
Expand Down Expand Up @@ -336,7 +336,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
return
}
state.taskman.internalEventCh <- ev
// state.handleDeviceEvent(ev)
//state.handleDeviceEvent(ev)
} else {
log.WithFields(logrus.Fields{
"type": incomingEvent.Type.String(),
Expand Down Expand Up @@ -437,7 +437,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
timeResourceOffersCall := time.Now()
var (
offers = e.GetOffers().GetOffers()
callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
tasksLaunchedThisCycle = 0
offersDeclined = 0
)
Expand Down Expand Up @@ -613,7 +613,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
var offerWaitGroup sync.WaitGroup
offerWaitGroup.Add(len(offers))

for offerIndex := range offers {
for offerIndex, _ := range offers {
go func(offerIndex int) {
defer offerWaitGroup.Done()

Expand Down Expand Up @@ -1013,6 +1013,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)).
WithField("offers", len(offers)).
WithField("offerHost", offer.Hostname))

}(offerIndex) // end for offer closure
} // end for _, offer := range offers
offerWaitGroup.Wait()
Expand Down Expand Up @@ -1093,7 +1094,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice
out := make([]string, len(machines))
i := 0
for k := range machines {
for k, _ := range machines {
out[i] = k
i++
}
Expand Down Expand Up @@ -1183,7 +1184,6 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc {
// have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers.
// This should generally run when we have received a TASK_FINISHED for some tasks, and we have more
// tasks to run.

func (state *schedulerState) tryReviveOffers(ctx context.Context) {
// limit the rate at which we request offer revival
select {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func logAllEvents() eventrules.Rule {
}
}
offerIds := make([]string, len(off))
for i := range off {
for i, _ := range off {
offerIds[i] = off[i].GetID().Value
}
fields["offerIds"] = strings.Join(offerIds, ",")
Expand Down Expand Up @@ -1335,6 +1335,7 @@ func makeTaskForMesosResources(
descriptorDetector string,
offerIDsToDecline map[mesos.OfferID]struct{},
) (*Task, *mesos.TaskInfo) {

bindMap := make(channel.BindMap)
for _, ch := range wants.InboundChannels {
if ch.Addressing == channel.IPC {
Expand Down Expand Up @@ -1367,7 +1368,7 @@ func makeTaskForMesosResources(
Attributes: offer.Attributes,
Hostname: offer.Hostname,
}
state.taskman.AgentCache.Update(agentForCache) // thread safe
state.taskman.AgentCache.Update(agentForCache) //thread safe
machinesUsed[offer.Hostname] = struct{}{}

taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId)
Expand Down Expand Up @@ -1565,11 +1566,12 @@ func makeTaskForMesosResources(
ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH")
mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{}
if ok {
mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables,
mesos.Environment_Variable{
Name: "LD_LIBRARY_PATH",
Value: proto.String(ldLibPath),
})
mesosTaskInfo.Executor.Command.Environment.Variables =
append(mesosTaskInfo.Executor.Command.Environment.Variables,
mesos.Environment_Variable{
Name: "LD_LIBRARY_PATH",
Value: proto.String(ldLibPath),
})
}

return taskPtr, &mesosTaskInfo
Expand Down
10 changes: 5 additions & 5 deletions core/task/schedulerstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ type schedulerState struct {
reviveTokens <-chan struct{}
tasksToDeploy chan *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
reviveOffersDone chan struct{}
random *rand.Rand
reviveOffersTrg chan struct{}
random *rand.Rand

// shouldn't change at runtime, so thread safe:
role string
Expand Down Expand Up @@ -107,6 +106,8 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (

tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS)

reviveOffersTrg := make(chan struct{})

state := &schedulerState{
taskman: taskman,
fidStore: fidStore,
Expand All @@ -116,8 +117,7 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
viper.GetDuration("mesosReviveWait"),
nil),
tasksToDeploy: tasksToDeploy,
reviveOffersTrg: make(chan struct{}),
reviveOffersDone: make(chan struct{}),
reviveOffersTrg: reviveOffersTrg,
wantsTaskResources: mesos.Resources{},
executor: executorInfo,
metricsAPI: metricsAPI,
Expand Down

0 comments on commit 0e7d263

Please sign in to comment.