Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use two channels to communicate mesos REVIVE #623

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type Manager struct {

tasksToDeploy chan<- *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
cq *controlcommands.CommandQueue
reviveOffersTrg chan struct{}
reviveOffersDone chan struct{}
cq *controlcommands.CommandQueue

tasksLaunched int
tasksFinished int
Expand Down Expand Up @@ -141,6 +142,7 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
taskman.cq = taskman.schedulerState.commandqueue
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
taskman.reviveOffersDone = taskman.schedulerState.reviveOffersDone
taskman.ackKilledTasks = newAcks()

schedState.setupCli()
Expand All @@ -156,7 +158,8 @@ func (m *Manager) newTaskForMesosOffer(
offer *mesos.Offer,
descriptor *Descriptor,
localBindMap channel.BindMap,
executorId mesos.ExecutorID) (t *Task) {
executorId mesos.ExecutorID,
) (t *Task) {
newId := uid.New().String()
t = &Task{
name: fmt.Sprintf("%s#%s", descriptor.TaskClassName, newId),
Expand Down Expand Up @@ -197,8 +200,8 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
if err != nil {
return
}
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] //get IRepo pointer from RepoManager
if repo == nil { //should never end up here
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] // get IRepo pointer from RepoManager
if repo == nil { // should never end up here
return nil, errors.New("getTaskClassList: repo not found for " + taskClass)
}

Expand All @@ -223,7 +226,6 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
taskInfo := strings.Split(taskPath, "/tasks/")
if len(taskInfo) == 1 {
taskFilename = taskInfo[0]

} else {
taskFilename = taskInfo[1]
}
Expand Down Expand Up @@ -280,7 +282,7 @@ func (m *Manager) removeInactiveClasses() {
return
}

func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused
func (m *Manager) RemoveReposClasses(repoPath string) { // Currently unused
utils.EnsureTrailingSlash(&repoPath)

_ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error {
Expand Down Expand Up @@ -327,7 +329,6 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) {
}

func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) {

/*
Here's what's gonna happen:
1) check if any tasks are already in Roster, whether they are already locked
Expand Down Expand Up @@ -516,7 +517,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
timeReviveOffers := time.Now()
timeDeployMu := time.Now()
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
<-m.reviveOffersTrg // we only continue when it's done
<-m.reviveOffersDone // we only continue when it's done
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))
Expand Down Expand Up @@ -597,7 +598,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
// can't lock some of them, so we must roll back and keep them
// unlocked in the roster.
var deployedTaskIds []string
for taskPtr, _ := range deployedTasks {
for taskPtr := range deployedTasks {
taskPtr.SetParent(nil)
deployedTaskIds = append(deployedTaskIds, taskPtr.taskId)
}
Expand All @@ -612,11 +613,11 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
}

// Finally, we write to the roster. Point of no return!
for taskPtr, _ := range deployedTasks {
for taskPtr := range deployedTasks {
m.roster.append(taskPtr)
}
if deploymentSuccess {
for taskPtr, _ := range deployedTasks {
for taskPtr := range deployedTasks {
taskPtr.GetParent().SetTask(taskPtr)
}
for taskPtr, descriptor := range tasksAlreadyRunning {
Expand All @@ -629,7 +630,6 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
}

func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error {

taskReleaseErrors := make(map[string]error)
taskIdsReleased := make([]string, 0)

Expand Down Expand Up @@ -686,7 +686,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
taskPath := task.GetParentRolePath()
for inbChName, endpoint := range task.GetLocalBindMap() {
var bindMapKey string
if strings.HasPrefix(inbChName, "::") { //global channel alias
if strings.HasPrefix(inbChName, "::") { // global channel alias
bindMapKey = inbChName

// deduplication
Expand Down Expand Up @@ -785,7 +785,6 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event string, dest string, commonArgs controlcommands.PropertyMap) error {
notify := make(chan controlcommands.MesosCommandResponse)
receivers, err := tasks.GetMesosCommandTargets()

if err != nil {
return err
}
Expand Down Expand Up @@ -870,7 +869,6 @@ func (m *Manager) TriggerHooks(envId uid.ID, tasks Tasks) error {

notify := make(chan controlcommands.MesosCommandResponse)
receivers, err := tasks.GetMesosCommandTargets()

if err != nil {
return err
}
Expand Down Expand Up @@ -935,7 +933,6 @@ func (m *Manager) GetTask(id string) *Task {
}

func (m *Manager) updateTaskState(taskId string, state string) {

taskPtr := m.roster.getByTaskId(taskId)
if taskPtr == nil {
log.WithField("taskId", taskId).
Expand Down Expand Up @@ -989,7 +986,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
}
if ack, ok := m.ackKilledTasks.getValue(taskId); ok {
ack <- struct{}{}
//close(ack) // It can even be left open?
// close(ack) // It can even be left open?
}

return
Expand Down Expand Up @@ -1030,7 +1027,6 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {

// Kill all tasks outside an environment (all unlocked tasks)
func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {

toKill := m.roster.filtered(func(t *Task) bool {
return !t.IsLocked()
})
Expand Down
32 changes: 15 additions & 17 deletions core/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ var schedEventsCh = make(chan scheduler.Event_Type)

func runSchedulerController(ctx context.Context,
state *schedulerState,
fidStore store.Singleton) error {
fidStore store.Singleton,
) error {
// Set up communication from controller to state machine.
go func() {
for {
Expand All @@ -103,7 +104,7 @@ func runSchedulerController(ctx context.Context,
for {
<-state.reviveOffersTrg
doReviveOffers(ctx, state)
state.reviveOffersTrg <- struct{}{}
state.reviveOffersDone <- struct{}{}
}
}()

Expand Down Expand Up @@ -272,7 +273,6 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
// only one entry in the list, we signal back to commandqueue
// otherwise, we log and ignore.
return func(ctx context.Context, e *scheduler.Event) (err error) {

mesosMessage := e.GetMessage()
if mesosMessage == nil {
err = errors.New("message handler got bad MESSAGE")
Expand Down Expand Up @@ -336,7 +336,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
return
}
state.taskman.internalEventCh <- ev
//state.handleDeviceEvent(ev)
// state.handleDeviceEvent(ev)
} else {
log.WithFields(logrus.Fields{
"type": incomingEvent.Type.String(),
Expand Down Expand Up @@ -437,7 +437,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
timeResourceOffersCall := time.Now()
var (
offers = e.GetOffers().GetOffers()
callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
tasksLaunchedThisCycle = 0
offersDeclined = 0
)
Expand Down Expand Up @@ -613,7 +613,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
var offerWaitGroup sync.WaitGroup
offerWaitGroup.Add(len(offers))

for offerIndex, _ := range offers {
for offerIndex := range offers {
go func(offerIndex int) {
defer offerWaitGroup.Done()

Expand Down Expand Up @@ -1013,7 +1013,6 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)).
WithField("offers", len(offers)).
WithField("offerHost", offer.Hostname))

}(offerIndex) // end for offer closure
} // end for _, offer := range offers
offerWaitGroup.Wait()
Expand Down Expand Up @@ -1094,7 +1093,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice
out := make([]string, len(machines))
i := 0
for k, _ := range machines {
for k := range machines {
out[i] = k
i++
}
Expand Down Expand Up @@ -1184,6 +1183,7 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc {
// have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers.
// This should generally run when we have received a TASK_FINISHED for some tasks, and we have more
// tasks to run.

func (state *schedulerState) tryReviveOffers(ctx context.Context) {
// limit the rate at which we request offer revival
select {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func logAllEvents() eventrules.Rule {
}
}
offerIds := make([]string, len(off))
for i, _ := range off {
for i := range off {
offerIds[i] = off[i].GetID().Value
}
fields["offerIds"] = strings.Join(offerIds, ",")
Expand Down Expand Up @@ -1335,7 +1335,6 @@ func makeTaskForMesosResources(
descriptorDetector string,
offerIDsToDecline map[mesos.OfferID]struct{},
) (*Task, *mesos.TaskInfo) {

bindMap := make(channel.BindMap)
for _, ch := range wants.InboundChannels {
if ch.Addressing == channel.IPC {
Expand Down Expand Up @@ -1368,7 +1367,7 @@ func makeTaskForMesosResources(
Attributes: offer.Attributes,
Hostname: offer.Hostname,
}
state.taskman.AgentCache.Update(agentForCache) //thread safe
state.taskman.AgentCache.Update(agentForCache) // thread safe
machinesUsed[offer.Hostname] = struct{}{}

taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId)
Expand Down Expand Up @@ -1566,12 +1565,11 @@ func makeTaskForMesosResources(
ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH")
mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{}
if ok {
mesosTaskInfo.Executor.Command.Environment.Variables =
append(mesosTaskInfo.Executor.Command.Environment.Variables,
mesos.Environment_Variable{
Name: "LD_LIBRARY_PATH",
Value: proto.String(ldLibPath),
})
mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables,
mesos.Environment_Variable{
Name: "LD_LIBRARY_PATH",
Value: proto.String(ldLibPath),
})
}

return taskPtr, &mesosTaskInfo
Expand Down
10 changes: 5 additions & 5 deletions core/task/schedulerstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ type schedulerState struct {
reviveTokens <-chan struct{}
tasksToDeploy chan *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
random *rand.Rand
reviveOffersTrg chan struct{}
reviveOffersDone chan struct{}
random *rand.Rand

// shouldn't change at runtime, so thread safe:
role string
Expand Down Expand Up @@ -106,8 +107,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (

tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS)

reviveOffersTrg := make(chan struct{})

state := &schedulerState{
taskman: taskman,
fidStore: fidStore,
Expand All @@ -117,7 +116,8 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
viper.GetDuration("mesosReviveWait"),
nil),
tasksToDeploy: tasksToDeploy,
reviveOffersTrg: reviveOffersTrg,
reviveOffersTrg: make(chan struct{}),
reviveOffersDone: make(chan struct{}),
wantsTaskResources: mesos.Resources{},
executor: executorInfo,
metricsAPI: metricsAPI,
Expand Down
Loading