Skip to content

Commit

Permalink
[core] Make task manager eventStream thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Mar 8, 2024
1 parent c24398f commit 4db08d1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 46 deletions.
8 changes: 7 additions & 1 deletion core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,13 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t
log.WithField("partition", env.id).
WithField("run", env.currentRunNumber).
Errorf("Scheduled auto stop transition failed: %s, Transitioning into ERROR", err.Error())
_ = env.TryTransition(NewGoErrorTransition(ManagerInstance().taskman))
err = env.TryTransition(NewGoErrorTransition(ManagerInstance().taskman))
if err != nil {
log.WithField("partition", env.id).
WithField("run", env.currentRunNumber).
Errorf("Forced transition to ERROR failed: %s", err.Error())
env.setState("ERROR")
}
return
}
}
Expand Down
94 changes: 49 additions & 45 deletions core/environment/eventStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
* Intergovernmental Organization or submit itself to any jurisdiction.
*/


package environment

import (
"sync"

pb "github.com/AliceO2Group/Control/core/protos"

"github.com/AliceO2Group/Control/common/event"
pb "github.com/AliceO2Group/Control/core/protos"
)

type Subscription interface {
Expand All @@ -40,40 +39,45 @@ type Subscription interface {
}

type eventStream struct {
stream chan *pb.Event
stream chan *pb.Event
mu sync.Mutex
}

func SubscribeToStream(ch chan *pb.Event) Subscription {
return &eventSub{
feed: &eventStream{
feed: &eventStream{
stream: ch,
},
err: make(chan error),
}
err: make(chan error),
}
}

func (e *eventStream) send(data *pb.Event) {
e.mu.Lock()
defer e.mu.Unlock()
if e.stream != nil {
e.stream <- data
}
}

func (e *eventStream) closeStream() {
e.mu.Lock()
defer e.mu.Unlock()
close(e.stream)
e.stream = nil
}

type eventSub struct {
feed *eventStream
once sync.Once
err chan error
feed *eventStream
once sync.Once
err chan error
}

func (s *eventSub) Unsubscribe() {
s.once.Do(func() {
s.once.Do(func() {
s.feed.closeStream()
close(s.err)
})
close(s.err)
})
}

func (s *eventSub) GetFeed() chan *pb.Event {
Expand All @@ -85,44 +89,44 @@ func (s *eventSub) Send(ev event.Event) {

switch typedEvent := ev.(type) {
case *event.RoleEvent:
re := pb.Event_RoleEvent{
RoleEvent: &pb.Ev_RoleEvent{
Name: typedEvent.GetName(),
State: typedEvent.GetState(),
Status: typedEvent.GetStatus(),
RolePath: typedEvent.GetRolePath(),
},
}
data = pb.WrapEvent(&re)
re := pb.Event_RoleEvent{
RoleEvent: &pb.Ev_RoleEvent{
Name: typedEvent.GetName(),
State: typedEvent.GetState(),
Status: typedEvent.GetStatus(),
RolePath: typedEvent.GetRolePath(),
},
}
data = pb.WrapEvent(&re)
case *event.TaskEvent:
re := pb.Event_TaskEvent{
TaskEvent: &pb.Ev_TaskEvent{
Name: typedEvent.GetName(),
Taskid: typedEvent.GetTaskID(),
State: typedEvent.GetState(),
Status: typedEvent.GetStatus(),
Hostname: typedEvent.GetHostname(),
ClassName: typedEvent.GetClassName(),
},
}
data = pb.WrapEvent(&re)
re := pb.Event_TaskEvent{
TaskEvent: &pb.Ev_TaskEvent{
Name: typedEvent.GetName(),
Taskid: typedEvent.GetTaskID(),
State: typedEvent.GetState(),
Status: typedEvent.GetStatus(),
Hostname: typedEvent.GetHostname(),
ClassName: typedEvent.GetClassName(),
},
}
data = pb.WrapEvent(&re)
case *event.EnvironmentEvent:
re := pb.Event_EnvironmentEvent{
EnvironmentEvent: &pb.Ev_EnvironmentEvent{
EnvironmentId: typedEvent.GetName(),
State: typedEvent.GetState(),
CurrentRunNumber: typedEvent.GetRun(),
Error: typedEvent.GetError(),
Message: typedEvent.GetMessage(),
},
}
data = pb.WrapEvent(&re)
re := pb.Event_EnvironmentEvent{
EnvironmentEvent: &pb.Ev_EnvironmentEvent{
EnvironmentId: typedEvent.GetName(),
State: typedEvent.GetState(),
CurrentRunNumber: typedEvent.GetRun(),
Error: typedEvent.GetError(),
Message: typedEvent.GetMessage(),
},
}
data = pb.WrapEvent(&re)
default:
// noop
}
s.feed.send(data)
}

func (s *eventSub) Err() <-chan error {
return s.err
}
return s.err
}

0 comments on commit 4db08d1

Please sign in to comment.