diff --git a/Makefile b/Makefile index 0766d7f7..2f169913 100644 --- a/Makefile +++ b/Makefile @@ -70,7 +70,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT)) GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut -TEST_DIRS := ./apricot/local ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration +TEST_DIRS := ./apricot/local ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment GO_TEST_DIRS := ./core/repos ./core/integration/dcs coverage:COVERAGE_PREFIX := ./coverage_results diff --git a/core/environment/environment.go b/core/environment/environment.go index 86148c9c..e30c93a3 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -525,6 +525,27 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, env.workflow.GetVars().Del("runNumber") // Ensure the auto stop timer is stopped (important for stop transitions NOT triggered by the timer itself) env.invalidateAutoStopTransition() + } else if e.Event == "GO_ERROR" { + endCompletionTime, ok := env.workflow.GetUserVars().Get("run_end_completion_time_ms") + if ok && endCompletionTime == "" { + runEndCompletionTime := time.Now() + runEndCompletionTimeStr := strconv.FormatInt(runEndCompletionTime.UnixMilli(), 10) + env.workflow.SetRuntimeVar("run_end_completion_time_ms", runEndCompletionTimeStr) + + the.EventWriterWithTopic(topic.Run).WriteEventWithTimestamp(&pb.Ev_RunEvent{ + EnvironmentId: envId.String(), + RunNumber: env.GetCurrentRunNumber(), + State: env.Sm.Current(), + Error: "", + Transition: e.Event, + TransitionStatus: pb.OpStatus_DONE_OK, + Vars: nil, + }, runEndCompletionTime) + + } else { + log.WithField("partition", envId.String()). + Debug("O2 End Completion time already set before after_GO_ERROR") + } } errorMsg := "" @@ -907,6 +928,10 @@ func (env *Environment) handlerFunc() func(e *fsm.Event) { "partition": env.id.String(), }).Debug("environment.sm starting transition") + if len(e.Args) == 0 { + e.Cancel(errors.New("transition missing in FSM event")) + return + } transition, ok := e.Args[0].(Transition) if !ok { e.Cancel(errors.New("transition wrapping error")) diff --git a/core/environment/environment_test.go b/core/environment/environment_test.go new file mode 100644 index 00000000..0d1b150e --- /dev/null +++ b/core/environment/environment_test.go @@ -0,0 +1,56 @@ +package environment + +import ( + "github.com/AliceO2Group/Control/core/integration" + "github.com/AliceO2Group/Control/core/integration/testplugin" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/viper" + "io" + "os" + "testing" +) + +const envTestConfig = "environment_test.yaml" + +var tmpDir *string + +var _ = BeforeSuite(func() { + var err error + tmpDir = new(string) + *tmpDir, err = os.MkdirTemp("", "o2control-core-environment") + Expect(err).NotTo(HaveOccurred()) + + // copy config files + configFiles := []string{envTestConfig} + for _, configFile := range configFiles { + from, err := os.Open("./" + configFile) + Expect(err).NotTo(HaveOccurred()) + defer from.Close() + + to, err := os.OpenFile(*tmpDir+"/"+configFile, os.O_RDWR|os.O_CREATE, 0666) + Expect(err).NotTo(HaveOccurred()) + defer to.Close() + + _, err = io.Copy(to, from) + Expect(err).NotTo(HaveOccurred()) + } + + viper.Set("coreWorkingDir", tmpDir) // used by NewRunNumber with YAML backend + + integration.Reset() + integration.RegisterPlugin("testplugin", "testPluginEndpoint", testplugin.NewPlugin) + viper.Reset() + viper.Set("integrationPlugins", []string{"testplugin"}) + viper.Set("testPluginEndpoint", "http://example.com") + viper.Set("config_endpoint", "file://"+*tmpDir+"/"+envTestConfig) +}) + +func TestCoreEnvironment(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Core Environment Test Suite") +} + +var _ = AfterSuite(func() { + os.RemoveAll(*tmpDir) +}) diff --git a/core/environment/environment_test.yaml b/core/environment/environment_test.yaml new file mode 100644 index 00000000..e37e5e39 --- /dev/null +++ b/core/environment/environment_test.yaml @@ -0,0 +1,13 @@ +# the contents of this file are not really used, but we need an apricot instance for environment test package, which needs a non-empty file +o2: + components: + qc: + TECHNICAL: + any: + entry: "config" + runtime: + aliecs: + defaults: + key1: value1 + vars: + key2: value2 \ No newline at end of file diff --git a/core/environment/fsm_test.go b/core/environment/fsm_test.go new file mode 100644 index 00000000..2fa50757 --- /dev/null +++ b/core/environment/fsm_test.go @@ -0,0 +1,120 @@ +package environment + +import ( + "github.com/AliceO2Group/Control/common/utils/uid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("allowed states and transitions in the environment FSM", func() { + var env *Environment + BeforeEach(func() { + envId, err := uid.FromString("2oDvieFrVTi") + Expect(err).NotTo(HaveOccurred()) + + env, err = newEnvironment(nil, envId) + Expect(err).NotTo(HaveOccurred()) + Expect(env).NotTo(BeNil()) + }) + When("FSM is created", func() { + It("should be in STANDBY", func() { + Expect(env.Sm.Current()).To(Equal("STANDBY")) + }) + }) + When("FSM is in STANDBY", func() { + It("should allow for DEPLOY, GO_ERROR and EXIT transitions", func() { + env.Sm.SetState("STANDBY") + Expect(env.Sm.Can("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Can("EXIT")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("STANDBY") + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + }) + }) + When("FSM is in DEPLOYED", func() { + It("should allow for CONFIGURED, GO_ERROR and EXIT transitions", func() { + env.Sm.SetState("DEPLOYED") + Expect(env.Sm.Can("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Can("EXIT")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("DEPLOYED") + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + }) + }) + When("FSM is in CONFIGURED", func() { + It("should allow for START_ACTIVITY, RESET, GO_ERROR and EXIT transitions", func() { + env.Sm.SetState("CONFIGURED") + Expect(env.Sm.Can("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Can("RESET")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Can("EXIT")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("CONFIGURED") + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + }) + }) + When("FSM is in RUNNING", func() { + It("should allow for STOP_ACTIVITY and GO_ERROR transitions", func() { + env.Sm.SetState("RUNNING") + Expect(env.Sm.Can("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("RUNNING") + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + Expect(env.Sm.Cannot("EXIT")).To(BeTrue()) + }) + }) + When("FSM is in ERROR", func() { + It("should allow for RECOVER transition", func() { + env.Sm.SetState("ERROR") + Expect(env.Sm.Can("RECOVER")).To(BeTrue()) + // We do not include EXIT as possible transition, since anyway we kill tasks not caring about the FSM. + // There is no known issue which could forbid us from that. + // TEARDOWN and DESTROY are the artificial transitions which correspond to that. + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("ERROR") + Expect(env.Sm.Cannot("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("EXIT")).To(BeTrue()) + }) + }) + When("FSM is in DONE", func() { + It("should not allow for any transitions", func() { + env.Sm.SetState("DONE") + Expect(env.Sm.Cannot("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + Expect(env.Sm.Cannot("EXIT")).To(BeTrue()) + }) + }) +}) diff --git a/core/environment/hooks_test.go b/core/environment/hooks_test.go new file mode 100644 index 00000000..8c20cacc --- /dev/null +++ b/core/environment/hooks_test.go @@ -0,0 +1,400 @@ +package environment + +import ( + "context" + "fmt" + "github.com/AliceO2Group/Control/common/utils/uid" + "github.com/AliceO2Group/Control/core/task" + "github.com/AliceO2Group/Control/core/workflow" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +type DummyTransition struct { + baseTransition + fail bool +} + +func NewDummyTransition(transition string, fail bool) Transition { + return &DummyTransition{ + baseTransition: baseTransition{ + name: transition, + taskman: nil, + }, + fail: fail, + } +} + +func (t DummyTransition) do(env *Environment) (err error) { + if t.fail { + return fmt.Errorf("transition successfully failed") + } + return nil +} + +var _ = Describe("calling hooks on FSM events", func() { + var env *Environment + BeforeEach(func() { + envId, err := uid.FromString("2oDvieFrVTi") + Expect(err).NotTo(HaveOccurred()) + + env, err = newEnvironment(map[string]string{}, envId) + Expect(err).NotTo(HaveOccurred()) + Expect(env).NotTo(BeNil()) + }) + + It("should execute the requested plugin call without errors", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", false)) + + Expect(err).NotTo(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should return an error if a critical hook fails", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + env.workflow.GetUserVars().Set("testplugin_fail", "true") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", false)) + + Expect(err).To(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should not return an error if an non-critical hook fails", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: false, Await: "before_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + env.workflow.GetUserVars().Set("testplugin_fail", "true") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", false)) + + Expect(err).NotTo(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should execute a hook with await statement different than the trigger, but within the same transition", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", false)) + + Expect(err).NotTo(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should execute a hook with await statement at a different transition than the trigger", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_RESET"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", false)) + Expect(err).NotTo(HaveOccurred()) + err = env.Sm.Event(context.Background(), "RESET", NewDummyTransition("RESET", false)) + Expect(err).NotTo(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should not execute a hook that should happen after a successful transition, but the transition fails", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "after_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", true)) + Expect(err).To(HaveOccurred()) + + _, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeFalse()) + }) + + Context("activity-related timestamps", func() { + It("should set run_start_time_ms before before_START_ACTIVITY hooks", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY"}, + "testplugin.TimestampObserver()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("seen_run_start_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + _, ok = env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") + Expect(ok).To(BeFalse()) + }) + It("should set run_start_completion_time_ms after after_START_ACTIVITY hooks", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call1", + task.Traits{Trigger: "after_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY"}, + "testplugin.TimestampObserver()", + ""), + workflow.NewCallRole( + "call2", + task.Traits{Trigger: "before_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY"}, + "testplugin.TimestampObserver()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + + Expect(err).NotTo(HaveOccurred()) + _, ok := env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") + Expect(ok).To(BeFalse()) + + err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) + + Expect(err).NotTo(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + It("should set run_end_time_ms before before_STOP_ACTIVITY hooks", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call1", + task.Traits{Trigger: "before_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY"}, + "testplugin.TimestampObserver()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + Expect(ok).To(BeFalse()) + }) + It("should set run_end_completion_time_ms after after_STOP_ACTIVITY hooks", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call1", + task.Traits{Trigger: "after_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY"}, + "testplugin.TimestampObserver()", + ""), + workflow.NewCallRole( + "call2", + task.Traits{Trigger: "before_RESET", Timeout: "5s", Critical: true, Await: "before_RESET"}, + "testplugin.TimestampObserver()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + Expect(ok).To(BeFalse()) + + err = env.Sm.Event(context.Background(), "RESET", NewDummyTransition("RESET", false)) + Expect(err).NotTo(HaveOccurred()) + + v, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + It("should clear timestamps from previous runs and set run_start_time_ms again before before_START_ACTIVITY hooks", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY"}, + "testplugin.TimestampObserver()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + + env.workflow.GetUserVars().Del("seen_run_start_time_ms") + env.workflow.GetUserVars().Del("seen_run_start_completion_time_ms") + env.workflow.GetUserVars().Del("seen_run_end_time_ms") + env.workflow.GetUserVars().Del("seen_run_end_completion_time_ms") + err = env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("seen_run_start_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + _, ok = env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") + Expect(ok).To(BeFalse()) + _, ok = env.workflow.GetUserVars().Get("seen_run_end_time_ms") + Expect(ok).To(BeFalse()) + _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + Expect(ok).To(BeFalse()) + }) + When("START_ACTIVITY transition fails", func() { + It("should set SOSOR timestamp, while the subsequent GO_ERROR transition should set SOEOR and EOEOR (but NOT EOSOR)", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", true)) + Expect(err).To(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("run_start_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).NotTo(BeEmpty()) + v, ok = env.workflow.GetUserVars().Get("run_start_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(BeEmpty()) + + err = env.Sm.Event(context.Background(), "GO_ERROR", NewDummyTransition("GO_ERROR", false)) + Expect(err).NotTo(HaveOccurred()) + v, ok = env.workflow.GetUserVars().Get("run_start_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(BeEmpty()) + v, ok = env.workflow.GetUserVars().Get("run_end_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).NotTo(BeEmpty()) + v, ok = env.workflow.GetUserVars().Get("run_end_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).NotTo(BeEmpty()) + }) + }) + When("STOP_ACTIVITY transition fails", func() { + It("should set SOEOR timestamp, while EOEOR should be set by subsequent GO_ERROR transition", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", true)) + Expect(err).To(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("run_end_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).NotTo(BeEmpty()) + v, ok = env.workflow.GetUserVars().Get("run_end_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(BeEmpty()) + + err = env.Sm.Event(context.Background(), "GO_ERROR", NewDummyTransition("GO_ERROR", false)) + Expect(err).NotTo(HaveOccurred()) + v, ok = env.workflow.GetUserVars().Get("run_end_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).NotTo(BeEmpty()) + }) + }) + When("environment goes to ERROR while in RUNNING", func() { + It("should set both run end timestamps", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call1", + task.Traits{Trigger: "leave_RUNNING", Timeout: "5s", Critical: true, Await: "leave_RUNNING"}, + "testplugin.TimestampObserver()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("CONFIGURED") + + err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) + Expect(err).NotTo(HaveOccurred()) + err = env.Sm.Event(context.Background(), "GO_ERROR", NewDummyTransition("GO_ERROR", false)) + Expect(err).NotTo(HaveOccurred()) + + v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + Expect(ok).To(BeFalse()) + v, ok = env.workflow.GetUserVars().Get("run_end_completion_time_ms") + Expect(ok).To(BeTrue()) + Expect(v).NotTo(BeEmpty()) + }) + }) + }) + + It("should allow to arrange multiple calls in order", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call3", + task.Traits{Trigger: "before_CONFIGURE+50", Timeout: "5s", Critical: true, Await: "before_CONFIGURE+50"}, + "testplugin.CallOrderObserver()", + ""), + workflow.NewCallRole( + "call2", + task.Traits{Trigger: "before_CONFIGURE+0", Timeout: "5s", Critical: true, Await: "before_CONFIGURE+0"}, + "testplugin.CallOrderObserver()", + ""), + workflow.NewCallRole( + "call1", + task.Traits{Trigger: "before_CONFIGURE-50", Timeout: "5s", Critical: true, Await: "before_CONFIGURE-50"}, + "testplugin.CallOrderObserver()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE", false)) + + Expect(err).NotTo(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("call_history") // set by testplugin.CallOrderObserver + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("root.call1,root.call2,root.call3")) + }) +}) diff --git a/core/environment/transition.go b/core/environment/transition.go index 947195c2..a821e3df 100644 --- a/core/environment/transition.go +++ b/core/environment/transition.go @@ -22,7 +22,6 @@ * Intergovernmental Organization or submit itself to any jurisdiction. */ - package environment import ( @@ -61,13 +60,13 @@ func MakeTransition(taskman *task.Manager, optype pb.ControlEnvironmentRequest_O } type baseTransition struct { - taskman *task.Manager - name string + taskman *task.Manager + name string } func (t baseTransition) check() (err error) { if t.taskman == nil { - err = errors.New("cannot configure environment with nil roleman") + err = errors.New("cannot transition environment with nil taskman") } return } diff --git a/core/integration/testplugin/plugin.go b/core/integration/testplugin/plugin.go index a79bdce4..bad1b69b 100644 --- a/core/integration/testplugin/plugin.go +++ b/core/integration/testplugin/plugin.go @@ -126,6 +126,15 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { doFail = false } + doHangS, ok := varStack["testplugin_hang"] + if !ok { + doHangS = "false" + } + doHang, convErr := strconv.ParseBool(doHangS) + if convErr != nil { + doHang = false + } + stack = make(map[string]interface{}) stack["Noop"] = func() (out string) { // must formally return string even when we return nothing log.WithField("partition", envId). @@ -166,30 +175,68 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { WithField("await", call.GetTraits().Await). Infof("executing testplugin.Test call: %s", message) - rn := varStack["run_number"] - var ( - runNumber64 int64 - err error - ) - runNumber64, err = strconv.ParseInt(rn, 10, 32) - if err != nil { - runNumber64 = 0 + rolePath, ok := varStack["__call_rolepath"] + if !ok { + call.VarStack["__call_error"] = "could not get __call_rolepath" } - timeout := callable.AcquireTimeout(TESTPLUGIN_GENERAL_OP_TIMEOUT, varStack, "Test", envId) - defer log.WithField("partition", envId). - WithField("level", infologger.IL_Ops). - WithField("rolepath", call.GetParentRolePath()). - WithField("trigger", call.GetTraits().Trigger). - WithField("await", call.GetTraits().Await). - WithField("run", runNumber64). - Infof("executed testplugin.Test call in %s", timeout) + parentRole, ok := call.GetParentRole().(callable.ParentRole) + if ok { + parentRole.SetGlobalRuntimeVar(rolePath+"_called", "true") + } - time.Sleep(timeout) if doFail { call.VarStack["__call_error"] = "error triggered in testplugin.Test call" } + if doHang { + for { + time.Sleep(time.Second) + } + } + + return + } + stack["TimestampObserver"] = func() (out string) { + rolePath, ok := varStack["__call_rolepath"] + if !ok { + call.VarStack["__call_error"] = "could not get __call_rolepath" + } + + parentRole, ok := call.GetParentRole().(callable.ParentRole) + if ok { + parentRole.SetGlobalRuntimeVar(rolePath+"_called", "true") + } else { + call.VarStack["__call_error"] = "could not get parent role" + return + } + + // check presence of the four expected run-related timestamps + for _, key := range []string{"run_start_time_ms", "run_start_completion_time_ms", "run_end_time_ms", "run_end_completion_time_ms"} { + value, ok := varStack[key] + if ok && len(value) > 0 && value != "0" { + parentRole.SetGlobalRuntimeVar("seen_"+key, "true") + } + } + return + } + stack["CallOrderObserver"] = func() (out string) { + rolePath, ok := varStack["__call_rolepath"] + if !ok { + call.VarStack["__call_error"] = "could not get __call_rolepath" + } + parentRole, ok := call.GetParentRole().(callable.ParentRole) + if !ok { + call.VarStack["__call_error"] = "could not get parent role" + return + } + + callHistory, _ := varStack["call_history"] + if len(callHistory) == 0 { + parentRole.SetGlobalRuntimeVar("call_history", rolePath) + } else { + parentRole.SetGlobalRuntimeVar("call_history", callHistory+","+rolePath) + } return } diff --git a/core/workflow/aggregatorrole.go b/core/workflow/aggregatorrole.go index 4446e5dc..f3d28720 100644 --- a/core/workflow/aggregatorrole.go +++ b/core/workflow/aggregatorrole.go @@ -26,6 +26,7 @@ package workflow import ( "errors" + "github.com/AliceO2Group/Control/common/gera" "strings" "sync" texttemplate "text/template" @@ -49,6 +50,18 @@ type aggregatorRole struct { aggregator } +func NewAggregatorRole(name string, roles []Role) (r Role) { + return &aggregatorRole{ + roleBase: roleBase{ + Name: name, + Defaults: gera.MakeStringMap(), + Vars: gera.MakeStringMap(), + UserVars: gera.MakeStringMap(), + }, + aggregator: aggregator{Roles: roles}, + } +} + func (r *aggregatorRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { // NOTE: see NOTE in roleBase.UnmarshalYAML diff --git a/core/workflow/callrole.go b/core/workflow/callrole.go index ddbb06c1..8d7b67c4 100644 --- a/core/workflow/callrole.go +++ b/core/workflow/callrole.go @@ -26,6 +26,7 @@ package workflow import ( "errors" + "github.com/AliceO2Group/Control/common/gera" "strings" texttemplate "text/template" "time" @@ -51,6 +52,20 @@ type callRole struct { ReturnVar string `yaml:"-,omitempty"` } +func NewCallRole(name string, traits task.Traits, funcCall string, returnVar string) (r Role) { + return &callRole{ + roleBase: roleBase{ + Name: name, + Defaults: gera.MakeStringMap(), + Vars: gera.MakeStringMap(), + UserVars: gera.MakeStringMap(), + }, + Traits: traits, + FuncCall: funcCall, + ReturnVar: returnVar, + } +} + func (t *callRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { aux := struct { Call struct { diff --git a/docs/handbook/configuration.md b/docs/handbook/configuration.md index c2b3a86a..baabe2ea 100644 --- a/docs/handbook/configuration.md +++ b/docs/handbook/configuration.md @@ -39,7 +39,7 @@ The state machine callback moments are exposed to the AliECS workflow template i * `func` - mandatory, it parses as an [`antonmedv/expr`](https://github.com/antonmedv/expr) expression that corresponds to a call to a function that belongs to an integration plugin object (e.g. `bookkeeping.StartOfRun()`, `dcs.EndOfRun()`, etc.). * `trigger` - mandatory, the expression at `func` will be executed once the state machine reaches this moment. * `await` - optional, if absent it defaults to the same as `trigger`, the expression at `func` needs to finish by this moment, and the state machine will block until `func` completes. -* `timeout` - optional, Go `time.Duration` expression, defaults to `30s`, the maximum time `func` will be granted to complete before its context is invalidated. +* `timeout` - optional, Go `time.Duration` expression, defaults to `30s`, the maximum time that `func` should take. The value is provided to the plugin via `varStack["__call_timeout"]` and the plugin should implement a timeout mechanism. The ECS will not abort the call upon reaching the timeout value! * `critical` - optional, it defaults to `true`, if `true` then a failure or timeout for `func` will send the environment state machine to `ERROR`. Consider the following example: