Skip to content

Commit

Permalink
drtprod: introduce target dependency
Browse files Browse the repository at this point in the history
Today there is no way to define dependency on other targets. This is needed in certain cases. e.g. if we want to copy certificates after both the clusters are ready.
This PR also resolves an issue with roachprod command where the stdin input was not considered. Now, the command is executed with the interactive mode.

Epic: None
Release note: None
  • Loading branch information
nameisbhaskar committed Oct 7, 2024
1 parent 3c99347 commit e484d03
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 41 deletions.
106 changes: 78 additions & 28 deletions pkg/cmd/drtprod/cli/commands/yamlprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"os"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/cmd/drtprod/helpers"
Expand All @@ -17,7 +18,7 @@ import (
)

// commandExecutor is responsible for executing the shell commands
var commandExecutor = helpers.ExecuteCmd
var commandExecutor = helpers.ExecuteCmdWithPrefix

// GetYamlProcessor creates a new Cobra command for processing a YAML file.
// The command expects a YAML file as an argument and runs the commands defined in it.
Expand Down Expand Up @@ -63,8 +64,10 @@ type step struct {

// target defines a target cluster with associated steps to be executed.
type target struct {
TargetName string `yaml:"target_name"` // Name of the target cluster
Steps []step `yaml:"steps"` // Steps to execute on the target cluster
TargetName string `yaml:"target_name"` // Name of the target cluster
DependentTargets []string `yaml:"dependent_targets"` // targets should complete before starting this target
Steps []step `yaml:"steps"` // Steps to execute on the target cluster
commands []*command
}

// yamlConfig represents the structure of the entire YAML configuration file.
Expand Down Expand Up @@ -137,50 +140,97 @@ func setEnv(environment map[string]string, displayOnly bool) error {
func processTargets(
ctx context.Context, targets []target, displayOnly bool, targetNames []string,
) error {
// targetNameMap is used to check all targets that are provided as user input
targetNameMap := make(map[string]struct{})
targetMap := make(map[string][]*command)
for _, tn := range targetNames {
targetNameMap[tn] = struct{}{}
}
for i := 0; i < len(targets); i++ {
targets[i].TargetName = os.ExpandEnv(targets[i].TargetName)
t := targets[i]
if _, ok := targetNameMap[t.TargetName]; len(targetNames) > 0 && !ok {
fmt.Printf("Ignoring execution for target %s\n", t.TargetName)
continue
}
// Generate the commands for each target's steps
targetSteps, err := generateCmdsFromSteps(t.TargetName, t.Steps)
if err != nil {
return err
}
targetMap[t.TargetName] = targetSteps
waitGroupTracker, err := buildTargetCmdsAndRegisterWaitGroups(targets, targetNameMap, targetNames)
if err != nil {
return err
}

// Use a WaitGroup to execute commands concurrently
// Use a WaitGroup to wait for commands executed concurrently
wg := sync.WaitGroup{}
for targetName, cmds := range targetMap {
for _, t := range targets {
if _, ok := targetNameMap[t.TargetName]; len(targetNames) > 0 && !ok {
// user provided targets do not have this target
continue
}
if displayOnly {
displayCommands(targetName, cmds)
// if we need to display, there is no wait group to wait for
displayCommands(t)
continue
}
wg.Add(1)
go func(tn string, commands []*command) {
err := executeCommands(ctx, tn, commands)
go func(theTarget target) {
// defer complete the wait group for the dependent targets to proceed
defer waitGroupTracker[theTarget.TargetName].Done()
defer wg.Done()
for _, dt := range theTarget.DependentTargets {
if twg, ok := waitGroupTracker[dt]; ok {
fmt.Printf("%s: waiting on <%s>\n", theTarget.TargetName, dt)
// wait on the dependent targets
// it would not matter if we wait sequentially as all dependent targets need to complete
twg.Wait()
}
}
err := executeCommands(ctx, theTarget.TargetName, theTarget.commands)
if err != nil {
fmt.Printf("%s: Error executing commands: %v\n", tn, err)
fmt.Printf("%s: Error executing commands: %v\n", theTarget.TargetName, err)
}
wg.Done()
}(targetName, cmds)
}(t)
}
// final wait for all targets to complete
wg.Wait()
return nil
}

// buildTargetCmdsAndRegisterWaitGroups builds the commands per target and registers the target to a wait group
// tracker and returns the same.
// The wait group tracker is a map of target name to a wait group. A delta is added to the wait group that is
// marked done when the specific target is complete. The wait group is use by the dependent targets to wait for
// the completion of the target.
func buildTargetCmdsAndRegisterWaitGroups(
targets []target, targetNameMap map[string]struct{}, targetNames []string,
) (map[string]*sync.WaitGroup, error) {
// map of target name to a wait group. The wait group is used by dependent target to wait for the target to complete
waitGroupTracker := make(map[string]*sync.WaitGroup)

// iterate over all the targets and create all the commands that should be executed for the target
for i := 0; i < len(targets); i++ {
// expand the environment variables
targets[i].TargetName = os.ExpandEnv(targets[i].TargetName)
t := targets[i]
for j := 0; j < len(t.DependentTargets); j++ {
targets[i].DependentTargets[j] = os.ExpandEnv(targets[i].DependentTargets[j])
}
if _, ok := targetNameMap[t.TargetName]; len(targetNames) > 0 && !ok {
fmt.Printf("Ignoring execution for target %s\n", t.TargetName)
continue
}
// add a delta wait for this target. This is added here so that when the execution loop is run, we need not
// worry about the sequence
waitGroupTracker[t.TargetName] = &sync.WaitGroup{}
waitGroupTracker[t.TargetName].Add(1)
// Generate the commands for each target's steps
targetSteps, err := generateCmdsFromSteps(t.TargetName, t.Steps)
if err != nil {
return waitGroupTracker, err
}
targets[i].commands = targetSteps
}
return waitGroupTracker, nil
}

// displayCommands prints the commands in stdout
func displayCommands(name string, cmds []*command) {
fmt.Printf("For target <%s>:\n", name)
for _, cmd := range cmds {
func displayCommands(t target) {
dependentTargetMsg := ""
if len(t.DependentTargets) > 0 {
dependentTargetMsg = fmt.Sprintf(" after [%s]", strings.Join(t.DependentTargets, ", "))
}
fmt.Printf("For target <%s>%s:\n", t.TargetName, dependentTargetMsg)
for _, cmd := range t.commands {
fmt.Printf("|-> %s\n", cmd)
for _, rCmd := range cmd.rollbackCmds {
fmt.Printf(" |-> (Rollback) %s\n", rCmd)
Expand Down
45 changes: 43 additions & 2 deletions pkg/cmd/drtprod/cli/commands/yamlprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,34 @@ environment:
t.Run("expect no failure", func(t *testing.T) {
name1Commands := make([]string, 0)
name2Commands := make([]string, 0)
depN1Commands := make([]string, 0)
depN1N2Commands := make([]string, 0)
depNotPresentCommands := make([]string, 0)
commandExecutor = func(ctx context.Context, logPrefix string, cmd string, args ...string) error {
if strings.HasPrefix(logPrefix, "name_value1") {
name1Commands = append(name1Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "name_value2") {
name2Commands = append(name2Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "dependent_target_n1") {
// expect that "name_value1" is complete by now
require.Equal(t, 6, len(name1Commands))
depN1Commands = append(depN1Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "dependent_target_n2_n1") {
// expect that "name_value1" and "name_value2" is complete by now
require.Equal(t, 6, len(name1Commands))
require.Equal(t, 1, len(name2Commands))
depN1N2Commands = append(depN1N2Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "dependent_target_not_present") {
depNotPresentCommands = append(depNotPresentCommands, (&command{name: cmd, args: args}).String())
}
return nil
}
require.Nil(t, processYaml(ctx, getTestYaml(), false, nil))
require.Equal(t, 6, len(name1Commands))
require.Equal(t, 1, len(name2Commands))
require.Equal(t, 1, len(depN1Commands))
require.Equal(t, 1, len(depN1N2Commands))
require.Equal(t, 1, len(depNotPresentCommands))
// the flags are maintained as map and can be in any sequence
require.True(t, strings.HasPrefix(name1Commands[0], "roachprod dummy1 name_value1 arg11"))
require.True(t, strings.Contains(name1Commands[0], "--clouds=gce"))
Expand Down Expand Up @@ -146,7 +163,31 @@ targets:
args:
- $NAME_2
- arg12
- target_name: dependent_target_n1
dependent_targets:
- $NAME_1
steps:
- command: dummy2
args:
- $NAME_2
- arg12
- target_name: dependent_target_n2_n1
dependent_targets:
- $NAME_2
- name_value1
- name_value1
steps:
- command: dummy2
args:
- $NAME_2
- arg12
- target_name: dependent_target_not_present
dependent_targets:
- not_present
steps:
- command: dummy2
args:
- $NAME_2
- arg12
`)
}
2 changes: 1 addition & 1 deletion pkg/cmd/drtprod/cli/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Initialize(ctx context.Context) {
if err != nil {
if strings.Contains(err.Error(), "unknown command") {
// Command not found, execute it in roachprod instead.
_ = helpers.ExecuteCmd(ctx, "roachprod", "roachprod", os.Args[1:]...)
_ = helpers.ExecuteCmdInteractive(ctx, "roachprod", os.Args[1:]...)
return
}
// If another error occurs, exit with a failure status.
Expand Down
30 changes: 30 additions & 0 deletions pkg/cmd/drtprod/configs/drt_chaos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,33 @@ targets:
- $WORKLOAD_CLUSTER
- workload
- script: "pkg/cmd/drtprod/configs/setup_datadog_workload"
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- command: get
args:
- $CLUSTER:1
- certs
- certs-$CLUSTER
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- sudo
- rm
- -rf
- certs
- command: put
args:
- $WORKLOAD_CLUSTER
- certs-$CLUSTER
- certs
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- chmod
- 600
- './certs/*'
30 changes: 30 additions & 0 deletions pkg/cmd/drtprod/configs/drt_large.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,33 @@ targets:
- $WORKLOAD_CLUSTER
- workload
- script: "pkg/cmd/drtprod/configs/setup_datadog_workload"
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- command: get
args:
- $CLUSTER:1
- certs
- certs-$CLUSTER
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- sudo
- rm
- -rf
- certs
- command: put
args:
- $WORKLOAD_CLUSTER
- certs-$CLUSTER
- certs
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- chmod
- 600
- './certs/*'
9 changes: 8 additions & 1 deletion pkg/cmd/drtprod/configs/drt_scale.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ targets:
- --
- "sudo systemctl unmask cron.service ; sudo systemctl enable cron.service ; echo \"crontab -l ; echo '@reboot sleep 100 && ~/cockroach.sh' | crontab -\" > t.sh ; sh t.sh ; rm t.sh"
# workload cluster specs
- target_name: $WORKLOAD_CLUSTER
steps:
- command: create
args:
- $WORKLOAD_CLUSTER
Expand Down Expand Up @@ -89,6 +91,11 @@ targets:
- $WORKLOAD_CLUSTER
- workload
- script: "pkg/cmd/drtprod/configs/setup_datadog_workload"
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- command: get
args:
- $CLUSTER:1
Expand All @@ -105,4 +112,4 @@ targets:
- --
- chmod
- 600
- ./certs/*
- './certs/*'
30 changes: 30 additions & 0 deletions pkg/cmd/drtprod/configs/drt_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,33 @@ targets:
username: workload
on_rollback:
- command: destroy
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- command: get
args:
- $CLUSTER:1
- certs
- certs-$CLUSTER
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- sudo
- rm
- -rf
- certs
- command: put
args:
- $WORKLOAD_CLUSTER
- certs-$CLUSTER
- certs
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- chmod
- 600
- './certs/*'
Loading

0 comments on commit e484d03

Please sign in to comment.