Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Commit

Permalink
3 usability improvements -Named executions, improved connection handl…
Browse files Browse the repository at this point in the history
…ing and better logging (#349)

* Flytectl will clearly print the endpoint that is unable to connect to

flyteorg/flyte#2762

Signed-off-by: Ketan Umare <[email protected]>

* Use an optional name argument to run an execution

Signed-off-by: Ketan Umare <[email protected]>

* Support for skipping initializing flyte client

Signed-off-by: Ketan Umare <[email protected]>

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Aug 19, 2022
1 parent a9db9a8 commit 6a3744c
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 49 deletions.
1 change: 1 addition & 0 deletions cmd/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func CreateCompileCommand() map[string]cmdCore.CommandEntry {
CmdFunc: compile,
PFlagProvider: config.DefaultCompileConfig,
ProjectDomainNotRequired: true,
DisableFlyteClient: true,
},
}
return compileResourcesFuncs
Expand Down
32 changes: 26 additions & 6 deletions cmd/core/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"fmt"

"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flytectl/cmd/config"
"github.com/flyteorg/flytectl/pkg/pkce"
"github.com/flyteorg/flyteidl/clients/go/admin"
Expand All @@ -23,6 +27,7 @@ type CommandEntry struct {
Short string
Long string
PFlagProvider PFlagProvider
DisableFlyteClient bool
}

func AddCommands(rootCmd *cobra.Command, cmdFuncs map[string]CommandEntry) {
Expand Down Expand Up @@ -65,14 +70,29 @@ func generateCommandFunc(cmdEntry CommandEntry) func(cmd *cobra.Command, args []
return cmdEntry.CmdFunc(ctx, args, CommandContext{})
}

clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).
WithTokenCache(pkce.TokenCacheKeyringProvider{
ServiceUser: fmt.Sprintf("%s:%s", adminCfg.Endpoint.String(), pkce.KeyRingServiceUser),
ServiceName: pkce.KeyRingServiceName,
}).Build(ctx)
cmdCtx := NewCommandContextNoClient(cmd.OutOrStdout())
if !cmdEntry.DisableFlyteClient {
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).
WithTokenCache(pkce.TokenCacheKeyringProvider{
ServiceUser: fmt.Sprintf("%s:%s", adminCfg.Endpoint.String(), pkce.KeyRingServiceUser),
ServiceName: pkce.KeyRingServiceName,
}).Build(ctx)
if err != nil {
return err
}
cmdCtx = NewCommandContext(clientSet, cmd.OutOrStdout())
}

err := cmdEntry.CmdFunc(ctx, args, cmdCtx)
if err != nil {
if s, ok := status.FromError(err); ok {
if s.Code() == codes.Unavailable || s.Code() == codes.Unauthenticated || s.Code() == codes.Unknown {
return errors.WithMessage(err,
fmt.Sprintf("Connection Info: [Endpoint: %s, InsecureConnection?: %v, AuthMode: %v]", adminCfg.Endpoint.String(), adminCfg.UseInsecureConnection, adminCfg.AuthType))
}
}
return err
}
return cmdEntry.CmdFunc(ctx, args, NewCommandContext(clientSet, cmd.OutOrStdout()))
return nil
}
}
21 changes: 15 additions & 6 deletions cmd/core/cmd_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@ type CommandContext struct {
out io.Writer
}

// NewCommandContextNoClient returns a new commandContext
func NewCommandContextNoClient(out io.Writer) CommandContext {
return NewCommandContext(nil, out)
}

func NewCommandContext(clientSet *admin.Clientset, out io.Writer) CommandContext {
return CommandContext{
clientSet: clientSet,
out: out,
adminClientFetcherExt: &ext.AdminFetcherExtClient{AdminClient: clientSet.AdminClient()},
adminClientUpdateExt: &ext.AdminUpdaterExtClient{AdminClient: clientSet.AdminClient()},
adminClientDeleteExt: &ext.AdminDeleterExtClient{AdminClient: clientSet.AdminClient()},
var adminClient service.AdminServiceClient
if clientSet != nil {
adminClient = clientSet.AdminClient()
}
return NewCommandContextWithExt(
clientSet,
&ext.AdminFetcherExtClient{AdminClient: adminClient},
&ext.AdminUpdaterExtClient{AdminClient: adminClient},
&ext.AdminDeleterExtClient{AdminClient: adminClient},
out,
)
}

// NewCommandContextWithExt construct command context with injected extensions. Helps in injecting mocked ones for testing.
Expand Down
30 changes: 22 additions & 8 deletions cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,29 @@ It is worth noting that the source's and target's project and domain can be diff
flytectl create execution --execFile execution_spec.yaml -p flytesnacks -d staging --targetProject flytesnacks
To relaunch an execution, pass the current execution ID as follows:
4. To relaunch an execution, pass the current execution ID as follows:
::
flytectl create execution --relaunch ffb31066a0f8b4d52b77 -p flytesnacks -d development
To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
5. To recover an execution, i.e., recreate it from the last known failure point for previously-run workflow execution, run:
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development
See :ref:` + "`ref_flyteidl.admin.ExecutionRecoverRequest`" + ` for more details.
Generic data types are supported for execution in a similar manner.
6. You can create executions idempotently by naming them. This is also a way to *name* an execution for discovery. Note,
an execution id has to be unique within a project domain. So if the *name* matches an existing execution an already exists exceptioj
will be raised.
::
flytectl create execution --recover ffb31066a0f8b4d52b77 -p flytesnacks -d development custom_name
7. Generic/Struct/Dataclass/JSON types are supported for execution in a similar manner.
The following is an example of how generic data can be specified while creating the execution.
::
Expand All @@ -100,7 +108,7 @@ The generated file would look similar to this. Here, empty values have been dump
task: core.type_system.custom_objects.add
version: v3
Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
8. Modified file with struct data populated for 'x' and 'y' parameters for the task "core.type_system.custom_objects.add":
::
Expand Down Expand Up @@ -171,21 +179,27 @@ func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.C
var err error
sourceProject := config.GetConfig().Project
sourceDomain := config.GetConfig().Domain

var targetExecName string
if len(args) > 0 {
targetExecName = args[0]
}

if execParams, err = readConfigAndValidate(config.GetConfig().Project, config.GetConfig().Domain); err != nil {
return err
}
var executionRequest *admin.ExecutionCreateRequest
switch execParams.execType {
case Relaunch:
return relaunchExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig)
return relaunchExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
case Recover:
return recoverExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig)
return recoverExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
case Task:
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig); err != nil {
if executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName); err != nil {
return err
}
case Workflow:
if executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig); err != nil {
if executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName); err != nil {
return err
}
default:
Expand Down
21 changes: 13 additions & 8 deletions cmd/create/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func createExecutionRequestForWorkflow(ctx context.Context, workflowName, project, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) (*admin.ExecutionCreateRequest, error) {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecName string) (*admin.ExecutionCreateRequest, error) {
// Fetch the launch plan
lp, err := cmdCtx.AdminFetcherExt().FetchLPVersion(ctx, workflowName, executionConfig.Version, project, domain)
if err != nil {
Expand Down Expand Up @@ -51,11 +51,11 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec
}
}

return createExecutionRequest(lp.Id, inputs, securityContext, authRole), nil
return createExecutionRequest(lp.Id, inputs, securityContext, authRole, targetExecName), nil
}

func createExecutionRequestForTask(ctx context.Context, taskName string, project string, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) (*admin.ExecutionCreateRequest, error) {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecName string) (*admin.ExecutionCreateRequest, error) {
// Fetch the task
task, err := cmdCtx.AdminFetcherExt().FetchTaskVersion(ctx, taskName, executionConfig.Version, project, domain)
if err != nil {
Expand Down Expand Up @@ -97,11 +97,11 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project
Version: task.Id.Version,
}

return createExecutionRequest(id, inputs, securityContext, authRole), nil
return createExecutionRequest(id, inputs, securityContext, authRole, targetExecName), nil
}

func relaunchExecution(ctx context.Context, executionName string, project string, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) error {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecutionName string) error {
if executionConfig.DryRun {
logger.Debugf(ctx, "skipping RelaunchExecution request (DryRun)")
return nil
Expand All @@ -112,6 +112,7 @@ func relaunchExecution(ctx context.Context, executionName string, project string
Project: project,
Domain: domain,
},
Name: targetExecutionName,
})
if err != nil {
return err
Expand All @@ -121,7 +122,7 @@ func relaunchExecution(ctx context.Context, executionName string, project string
}

func recoverExecution(ctx context.Context, executionName string, project string, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) error {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecName string) error {
if executionConfig.DryRun {
logger.Debugf(ctx, "skipping RecoverExecution request (DryRun)")
return nil
Expand All @@ -132,6 +133,7 @@ func recoverExecution(ctx context.Context, executionName string, project string,
Project: project,
Domain: domain,
},
Name: targetExecName,
})
if err != nil {
return err
Expand All @@ -141,12 +143,15 @@ func recoverExecution(ctx context.Context, executionName string, project string,
}

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, securityContext *core.SecurityContext,
authRole *admin.AuthRole) *admin.ExecutionCreateRequest {
authRole *admin.AuthRole, targetExecName string) *admin.ExecutionCreateRequest {

if len(targetExecName) == 0 {
targetExecName = "f" + strings.ReplaceAll(uuid.New().String(), "-", "")[:19]
}
return &admin.ExecutionCreateRequest{
Project: executionConfig.TargetProject,
Domain: executionConfig.TargetDomain,
Name: "f" + strings.ReplaceAll(uuid.New().String(), "-", "")[:19],
Name: targetExecName,
Spec: &admin.ExecutionSpec{
LaunchPlan: ID,
Metadata: &admin.ExecutionMetadata{
Expand Down
24 changes: 12 additions & 12 deletions cmd/create/execution_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func TestCreateExecutionForRelaunch(t *testing.T) {
s := setup()
createExecutionUtilSetup()
s.MockAdminClient.OnRelaunchExecutionMatch(s.Ctx, relaunchRequest).Return(executionCreateResponse, nil)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
}

func TestCreateExecutionForRelaunchNotFound(t *testing.T) {
s := setup()
createExecutionUtilSetup()
s.MockAdminClient.OnRelaunchExecutionMatch(s.Ctx, relaunchRequest).Return(nil, errors.New("unknown execution"))
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")

assert.NotNil(t, err)
assert.Equal(t, err, errors.New("unknown execution"))
Expand All @@ -67,15 +67,15 @@ func TestCreateExecutionForRecovery(t *testing.T) {
s := setup()
createExecutionUtilSetup()
s.MockAdminClient.OnRecoverExecutionMatch(s.Ctx, recoverRequest).Return(executionCreateResponse, nil)
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
}

func TestCreateExecutionForRecoveryNotFound(t *testing.T) {
s := setup()
createExecutionUtilSetup()
s.MockAdminClient.OnRecoverExecutionMatch(s.Ctx, recoverRequest).Return(nil, errors.New("unknown execution"))
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Equal(t, err, errors.New("unknown execution"))
}
Expand All @@ -86,7 +86,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
createExecutionUtilSetup()
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
Expand All @@ -101,7 +101,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
},
}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Nil(t, execCreateRequest)
assert.Equal(t, fmt.Errorf("parameter [nilparam] has nil Variable"), err)
Expand All @@ -110,7 +110,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
s := setup()
createExecutionUtilSetup()
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed"))
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Nil(t, execCreateRequest)
assert.Equal(t, err, errors.New("failed"))
Expand All @@ -122,7 +122,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
s.MockAdminClient.OnGetLaunchPlanMatch(s.Ctx, mock.Anything).Return(launchPlan, nil)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
executionConfig.KubeServiceAcct = ""
Expand All @@ -139,7 +139,7 @@ func TestCreateExecutionRequestForTask(t *testing.T) {
},
}
s.FetcherExt.OnFetchTaskVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(task, nil)
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
Expand All @@ -162,7 +162,7 @@ func TestCreateExecutionRequestForTask(t *testing.T) {
},
}
s.FetcherExt.OnFetchTaskVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(task, nil)
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Nil(t, execCreateRequest)
assert.Equal(t, fmt.Errorf("variable [nilvar] has nil type"), err)
Expand All @@ -171,7 +171,7 @@ func TestCreateExecutionRequestForTask(t *testing.T) {
s := setup()
createExecutionUtilSetup()
s.FetcherExt.OnFetchTaskVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed"))
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Nil(t, execCreateRequest)
assert.Equal(t, err, errors.New("failed"))
Expand All @@ -186,7 +186,7 @@ func TestCreateExecutionRequestForTask(t *testing.T) {
},
}
s.FetcherExt.OnFetchTaskVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(task, nil)
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForTask(s.Ctx, "taskName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
executionConfig.KubeServiceAcct = ""
Expand Down
6 changes: 3 additions & 3 deletions cmd/demo/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ func CreateDemoCommand() *cobra.Command {
demoResourcesFuncs := map[string]cmdcore.CommandEntry{
"start": {CmdFunc: startDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: startShort,
Long: startLong, PFlagProvider: sandboxCmdConfig.DefaultConfig},
Long: startLong, PFlagProvider: sandboxCmdConfig.DefaultConfig, DisableFlyteClient: true},
"teardown": {CmdFunc: teardownDemoCluster, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: teardownShort,
Long: teardownLong},
Long: teardownLong, DisableFlyteClient: true},
"status": {CmdFunc: demoClusterStatus, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: statusShort,
Long: statusLong},
"exec": {CmdFunc: demoClusterExec, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: execShort,
Long: execLong},
Long: execLong, DisableFlyteClient: true},
}

cmdcore.AddCommands(demo, demoResourcesFuncs)
Expand Down
6 changes: 3 additions & 3 deletions cmd/sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ func CreateSandboxCommand() *cobra.Command {
sandboxResourcesFuncs := map[string]cmdcore.CommandEntry{
"start": {CmdFunc: startSandboxCluster, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: startShort,
Long: startLong, PFlagProvider: sandboxCmdConfig.DefaultConfig},
Long: startLong, PFlagProvider: sandboxCmdConfig.DefaultConfig, DisableFlyteClient: true},
"teardown": {CmdFunc: teardownSandboxCluster, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: teardownShort,
Long: teardownLong},
Long: teardownLong, DisableFlyteClient: true},
"status": {CmdFunc: sandboxClusterStatus, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: statusShort,
Long: statusLong},
"exec": {CmdFunc: sandboxClusterExec, Aliases: []string{}, ProjectDomainNotRequired: true,
Short: execShort,
Long: execLong},
Long: execLong, DisableFlyteClient: true},
}

cmdcore.AddCommands(sandbox, sandboxResourcesFuncs)
Expand Down
Loading

0 comments on commit 6a3744c

Please sign in to comment.