Skip to content

Commit

Permalink
[core] Include workflow template info (incl public) in all env events
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Oct 4, 2024
1 parent ef3f414 commit fe45651
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 231 deletions.
261 changes: 146 additions & 115 deletions core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Environment struct {
eventStream Subscription
Public bool // From workflow or user
Description string // From workflow
WorkflowPath string // From workflow load

callsPendingAwait map[string] /*await expression, trigger only*/ callable.CallsMap
currentTransition string
Expand Down Expand Up @@ -167,27 +168,29 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
trigger := fmt.Sprintf("before_%s", e.Event)

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

// first, we execute hooks which should be executed before an event officially starts
errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)
if errHooks != nil {
e.Cancel(errHooks)
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Error: errHooks.Error(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step finished",
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Error: errHooks.Error(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step finished",
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
return
}
Expand Down Expand Up @@ -318,27 +321,29 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
},
"leave_state": func(_ context.Context, e *fsm.Event) {
trigger := fmt.Sprintf("leave_%s", e.Src)

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)
Expand All @@ -357,13 +362,14 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
if errHooks != nil {
e.Cancel(errHooks)
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Error: errHooks.Error(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step finished",
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Error: errHooks.Error(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step finished",
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
return
}
Expand All @@ -379,28 +385,30 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

if e.Err != nil {
return
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Message: "transition step starting",
Transition: e.Event,
TransitionStep: fmt.Sprintf("tasks_%s", e.Event),
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Message: "transition step starting",
Transition: e.Event,
TransitionStep: fmt.Sprintf("tasks_%s", e.Event),
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

env.handlerFunc()(e)
Expand All @@ -410,27 +418,29 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: e.Dst, // exceptionally we take the destination state here instead of the current, because the tasks have transitioned
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: fmt.Sprintf("tasks_%s", e.Event),
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: e.Dst, // exceptionally we take the destination state here instead of the current, because the tasks have transitioned
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: fmt.Sprintf("tasks_%s", e.Event),
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
},
"enter_state": func(_ context.Context, e *fsm.Event) {
trigger := fmt.Sprintf("enter_%s", e.Dst)

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)
Expand All @@ -450,14 +460,15 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

if e.Err != nil {
Expand All @@ -481,13 +492,14 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
trigger := fmt.Sprintf("after_%s", e.Event)

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step starting",
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)
Expand Down Expand Up @@ -617,14 +629,15 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,

// publish transition step complete event
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: errorMsg,
Message: "transition step finished",
Transition: e.Event,
TransitionStep: trigger,
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
},
},
Expand Down Expand Up @@ -970,47 +983,51 @@ func (env *Environment) TryTransition(t Transition) (err error) {
defer env.transitionMutex.Unlock()

the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Message: "transition starting",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Message: "transition starting",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

err = t.check()
if err != nil {
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: err.Error(),
Message: "transition impossible",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: err.Error(),
Message: "transition impossible",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
return
}
err = env.Sm.Event(context.Background(), t.eventName(), t)

if err != nil {
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: err.Error(),
Message: "transition error",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Error: err.Error(),
Message: "transition error",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
} else {
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Message: "transition completed successfully",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.currentRunNumber,
Message: "transition completed successfully",
Transition: t.eventName(),
LastRequestUser: env.GetLastRequestUser(),
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})
}
return
Expand Down Expand Up @@ -1387,6 +1404,20 @@ func (env *Environment) GetVarsAsString() string {
return sortMapToString(varStack)
}

func (env *Environment) GetWorkflowInfo() *pb.WorkflowTemplateInfo {
if env == nil || env.workflow == nil {
return &pb.WorkflowTemplateInfo{}
}

out := &pb.WorkflowTemplateInfo{
Name: env.name,
Description: env.Description,
Path: env.WorkflowPath,
Public: env.Public,
}
return out
}

// return true if the auto stop transition has been scheduled, false otherwise
// if true, the expected stop time is also returned
func (env *Environment) scheduleAutoStopTransition() (scheduled bool, expected time.Time) {
Expand Down
Loading

0 comments on commit fe45651

Please sign in to comment.