diff --git a/core/environment/environment.go b/core/environment/environment.go index a6b32038..41a36502 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -987,7 +987,13 @@ func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected t log.WithField("partition", env.id). WithField("run", env.currentRunNumber). Errorf("Scheduled auto stop transition failed: %s, Transitioning into ERROR", err.Error()) - _ = env.TryTransition(NewGoErrorTransition(ManagerInstance().taskman)) + err = env.TryTransition(NewGoErrorTransition(ManagerInstance().taskman)) + if err != nil { + log.WithField("partition", env.id). + WithField("run", env.currentRunNumber). + Errorf("Forced transition to ERROR failed: %s", err.Error()) + env.setState("ERROR") + } return } } diff --git a/core/environment/eventStream.go b/core/environment/eventStream.go index 1f45677e..3051ea73 100644 --- a/core/environment/eventStream.go +++ b/core/environment/eventStream.go @@ -22,14 +22,13 @@ * Intergovernmental Organization or submit itself to any jurisdiction. */ - package environment import ( "sync" - - pb "github.com/AliceO2Group/Control/core/protos" + "github.com/AliceO2Group/Control/common/event" + pb "github.com/AliceO2Group/Control/core/protos" ) type Subscription interface { @@ -40,40 +39,45 @@ type Subscription interface { } type eventStream struct { - stream chan *pb.Event + stream chan *pb.Event + mu sync.Mutex } func SubscribeToStream(ch chan *pb.Event) Subscription { return &eventSub{ - feed: &eventStream{ + feed: &eventStream{ stream: ch, }, - err: make(chan error), - } + err: make(chan error), + } } func (e *eventStream) send(data *pb.Event) { + e.mu.Lock() + defer e.mu.Unlock() if e.stream != nil { e.stream <- data } } func (e *eventStream) closeStream() { + e.mu.Lock() + defer e.mu.Unlock() close(e.stream) e.stream = nil } type eventSub struct { - feed *eventStream - once sync.Once - err chan error + feed *eventStream + once sync.Once + err chan error } func (s *eventSub) Unsubscribe() { - s.once.Do(func() { + s.once.Do(func() { s.feed.closeStream() - close(s.err) - }) + close(s.err) + }) } func (s *eventSub) GetFeed() chan *pb.Event { @@ -85,38 +89,38 @@ func (s *eventSub) Send(ev event.Event) { switch typedEvent := ev.(type) { case *event.RoleEvent: - re := pb.Event_RoleEvent{ - RoleEvent: &pb.Ev_RoleEvent{ - Name: typedEvent.GetName(), - State: typedEvent.GetState(), - Status: typedEvent.GetStatus(), - RolePath: typedEvent.GetRolePath(), - }, - } - data = pb.WrapEvent(&re) + re := pb.Event_RoleEvent{ + RoleEvent: &pb.Ev_RoleEvent{ + Name: typedEvent.GetName(), + State: typedEvent.GetState(), + Status: typedEvent.GetStatus(), + RolePath: typedEvent.GetRolePath(), + }, + } + data = pb.WrapEvent(&re) case *event.TaskEvent: - re := pb.Event_TaskEvent{ - TaskEvent: &pb.Ev_TaskEvent{ - Name: typedEvent.GetName(), - Taskid: typedEvent.GetTaskID(), - State: typedEvent.GetState(), - Status: typedEvent.GetStatus(), - Hostname: typedEvent.GetHostname(), - ClassName: typedEvent.GetClassName(), - }, - } - data = pb.WrapEvent(&re) + re := pb.Event_TaskEvent{ + TaskEvent: &pb.Ev_TaskEvent{ + Name: typedEvent.GetName(), + Taskid: typedEvent.GetTaskID(), + State: typedEvent.GetState(), + Status: typedEvent.GetStatus(), + Hostname: typedEvent.GetHostname(), + ClassName: typedEvent.GetClassName(), + }, + } + data = pb.WrapEvent(&re) case *event.EnvironmentEvent: - re := pb.Event_EnvironmentEvent{ - EnvironmentEvent: &pb.Ev_EnvironmentEvent{ - EnvironmentId: typedEvent.GetName(), - State: typedEvent.GetState(), - CurrentRunNumber: typedEvent.GetRun(), - Error: typedEvent.GetError(), - Message: typedEvent.GetMessage(), - }, - } - data = pb.WrapEvent(&re) + re := pb.Event_EnvironmentEvent{ + EnvironmentEvent: &pb.Ev_EnvironmentEvent{ + EnvironmentId: typedEvent.GetName(), + State: typedEvent.GetState(), + CurrentRunNumber: typedEvent.GetRun(), + Error: typedEvent.GetError(), + Message: typedEvent.GetMessage(), + }, + } + data = pb.WrapEvent(&re) default: // noop } @@ -124,5 +128,5 @@ func (s *eventSub) Send(ev event.Event) { } func (s *eventSub) Err() <-chan error { - return s.err -} \ No newline at end of file + return s.err +}