Skip to content

Commit

Permalink
drtprod: introduce target dependency
Browse files Browse the repository at this point in the history
Today there is no way to define dependency on other targets. This is needed in certain cases. e.g. if we want to copy certificates after both the clusters are ready.
This PR also resolves an issue with roachprod command where the stdin input was not considered. Now, the command is executed with the interactive mode.

Some more changes done for 150 node:
schema_change is created without cron entry as we need manual control on it

Epic: None
Release note: None
  • Loading branch information
nameisbhaskar committed Oct 25, 2024
1 parent 82b1fda commit 2c50fda
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 42 deletions.
108 changes: 79 additions & 29 deletions pkg/cmd/drtprod/cli/commands/yamlprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"os"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/cmd/drtprod/helpers"
Expand All @@ -17,7 +18,7 @@ import (
)

// commandExecutor is responsible for executing the shell commands
var commandExecutor = helpers.ExecuteCmd
var commandExecutor = helpers.ExecuteCmdWithPrefix

// GetYamlProcessor creates a new Cobra command for processing a YAML file.
// The command expects a YAML file as an argument and runs the commands defined in it.
Expand Down Expand Up @@ -63,8 +64,10 @@ type step struct {

// target defines a target cluster with associated steps to be executed.
type target struct {
TargetName string `yaml:"target_name"` // Name of the target cluster
Steps []step `yaml:"steps"` // Steps to execute on the target cluster
TargetName string `yaml:"target_name"` // Name of the target cluster
DependentTargets []string `yaml:"dependent_targets"` // targets should complete before starting this target
Steps []step `yaml:"steps"` // Steps to execute on the target cluster
commands []*command
}

// yamlConfig represents the structure of the entire YAML configuration file.
Expand Down Expand Up @@ -137,56 +140,103 @@ func setEnv(environment map[string]string, displayOnly bool) error {
func processTargets(
ctx context.Context, targets []target, displayOnly bool, targetNames []string,
) error {
// targetNameMap is used to check all targets that are provided as user input
targetNameMap := make(map[string]struct{})
targetMap := make(map[string][]*command)
for _, tn := range targetNames {
targetNameMap[tn] = struct{}{}
}
for i := 0; i < len(targets); i++ {
targets[i].TargetName = os.ExpandEnv(targets[i].TargetName)
t := targets[i]
if _, ok := targetNameMap[t.TargetName]; len(targetNames) > 0 && !ok {
fmt.Printf("Ignoring execution for target %s\n", t.TargetName)
continue
}
// Generate the commands for each target's steps
targetSteps, err := generateCmdsFromSteps(t.TargetName, t.Steps)
if err != nil {
return err
}
targetMap[t.TargetName] = targetSteps
waitGroupTracker, err := buildTargetCmdsAndRegisterWaitGroups(targets, targetNameMap, targetNames)
if err != nil {
return err
}

// Use a WaitGroup to execute commands concurrently
// Use a WaitGroup to wait for commands executed concurrently
wg := sync.WaitGroup{}
for targetName, cmds := range targetMap {
for _, t := range targets {
if _, ok := targetNameMap[t.TargetName]; len(targetNames) > 0 && !ok {
// the targets provided as "--targets" does not contain the current target
// so, this target is skipped
continue
}
if displayOnly {
displayCommands(targetName, cmds)
// if we need to display, there is no wait group to wait for
displayCommands(t)
continue
}
wg.Add(1)
go func(tn string, commands []*command) {
err := executeCommands(ctx, tn, commands)
go func(t target) {
// defer complete the wait group for the dependent targets to proceed
defer waitGroupTracker[t.TargetName].Done()
defer wg.Done()
for _, dt := range t.DependentTargets {
if twg, ok := waitGroupTracker[dt]; ok {
fmt.Printf("%s: waiting on <%s>\n", t.TargetName, dt)
// wait on the dependent targets
// it would not matter if we wait sequentially as all dependent targets need to complete
twg.Wait()
}
}
err := executeCommands(ctx, t.TargetName, t.commands)
if err != nil {
fmt.Printf("%s: Error executing commands: %v\n", tn, err)
fmt.Printf("%s: Error executing commands: %v\n", t.TargetName, err)
}
wg.Done()
}(targetName, cmds)
}(t)
}
// final wait for all targets to complete
wg.Wait()
return nil
}

// buildTargetCmdsAndRegisterWaitGroups builds the commands per target and registers the target to a wait group
// tracker and returns the same.
// The wait group tracker is a map of target name to a wait group. A delta is added to the wait group that is
// marked done when the specific target is complete. The wait group is use by the dependent targets to wait for
// the completion of the target.
func buildTargetCmdsAndRegisterWaitGroups(
targets []target, targetNameMap map[string]struct{}, targetNames []string,
) (map[string]*sync.WaitGroup, error) {
// map of target name to a wait group. The wait group is used by dependent target to wait for the target to complete
waitGroupTracker := make(map[string]*sync.WaitGroup)

// iterate over all the targets and create all the commands that should be executed for the target
for i := 0; i < len(targets); i++ {
// expand the environment variables
targets[i].TargetName = os.ExpandEnv(targets[i].TargetName)
t := targets[i]
for j := 0; j < len(t.DependentTargets); j++ {
targets[i].DependentTargets[j] = os.ExpandEnv(targets[i].DependentTargets[j])
}
if _, ok := targetNameMap[t.TargetName]; len(targetNames) > 0 && !ok {
fmt.Printf("Ignoring execution for target %s\n", t.TargetName)
continue
}
// add a delta wait for this target. This is added here so that when the execution loop is run, we need not
// worry about the sequence
waitGroupTracker[t.TargetName] = &sync.WaitGroup{}
waitGroupTracker[t.TargetName].Add(1)
// Generate the commands for each target's steps
targetSteps, err := generateCmdsFromSteps(t.TargetName, t.Steps)
if err != nil {
return waitGroupTracker, err
}
targets[i].commands = targetSteps
}
return waitGroupTracker, nil
}

// displayCommands prints the commands in stdout
func displayCommands(name string, cmds []*command) {
fmt.Printf("For target <%s>:\n", name)
for _, cmd := range cmds {
func displayCommands(t target) {
if len(t.DependentTargets) > 0 {
fmt.Printf("For target <%s> after [%s]:\n", t.TargetName, strings.Join(t.DependentTargets, ", "))
} else {
fmt.Printf("For target <%s>:\n", t.TargetName)
}
for _, cmd := range t.commands {
fmt.Printf("|-> %s\n", cmd)
for _, rCmd := range cmd.rollbackCmds {
fmt.Printf(" |-> (Rollback) %s\n", rCmd)
}
}

}

// executeCommands runs the list of commands for a specific target.
Expand Down
45 changes: 43 additions & 2 deletions pkg/cmd/drtprod/cli/commands/yamlprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,34 @@ environment:
t.Run("expect no failure", func(t *testing.T) {
name1Commands := make([]string, 0)
name2Commands := make([]string, 0)
depN1Commands := make([]string, 0)
depN1N2Commands := make([]string, 0)
depNotPresentCommands := make([]string, 0)
commandExecutor = func(ctx context.Context, logPrefix string, cmd string, args ...string) error {
if strings.HasPrefix(logPrefix, "name_value1") {
name1Commands = append(name1Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "name_value2") {
name2Commands = append(name2Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "dependent_target_n1") {
// expect that "name_value1" is complete by now
require.Equal(t, 6, len(name1Commands))
depN1Commands = append(depN1Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "dependent_target_n2_n1") {
// expect that "name_value1" and "name_value2" is complete by now
require.Equal(t, 6, len(name1Commands))
require.Equal(t, 1, len(name2Commands))
depN1N2Commands = append(depN1N2Commands, (&command{name: cmd, args: args}).String())
} else if strings.HasPrefix(logPrefix, "dependent_target_not_present") {
depNotPresentCommands = append(depNotPresentCommands, (&command{name: cmd, args: args}).String())
}
return nil
}
require.Nil(t, processYaml(ctx, getTestYaml(), false, nil))
require.Equal(t, 6, len(name1Commands))
require.Equal(t, 1, len(name2Commands))
require.Equal(t, 1, len(depN1Commands))
require.Equal(t, 1, len(depN1N2Commands))
require.Equal(t, 1, len(depNotPresentCommands))
// the flags are maintained as map and can be in any sequence
require.True(t, strings.HasPrefix(name1Commands[0], "roachprod dummy1 name_value1 arg11"))
require.True(t, strings.Contains(name1Commands[0], "--clouds=gce"))
Expand Down Expand Up @@ -146,7 +163,31 @@ targets:
args:
- $NAME_2
- arg12
- target_name: dependent_target_n1
dependent_targets:
- $NAME_1
steps:
- command: dummy2
args:
- $NAME_2
- arg12
- target_name: dependent_target_n2_n1
dependent_targets:
- $NAME_2
- name_value1
- name_value1
steps:
- command: dummy2
args:
- $NAME_2
- arg12
- target_name: dependent_target_not_present
dependent_targets:
- not_present
steps:
- command: dummy2
args:
- $NAME_2
- arg12
`)
}
2 changes: 1 addition & 1 deletion pkg/cmd/drtprod/cli/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Initialize(ctx context.Context) {
if err != nil {
if strings.Contains(err.Error(), "unknown command") {
// Command not found, execute it in roachprod instead.
_ = helpers.ExecuteCmd(ctx, "roachprod", "roachprod", os.Args[1:]...)
_ = helpers.ExecuteCmdInteractive(ctx, "roachprod", os.Args[1:]...)
return
}
// If another error occurs, exit with a failure status.
Expand Down
34 changes: 34 additions & 0 deletions pkg/cmd/drtprod/configs/drt_chaos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,40 @@ targets:
- $WORKLOAD_CLUSTER
- workload
- script: "pkg/cmd/drtprod/scripts/setup_datadog_workload"
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- script: rm
args:
- -rf
- certs-$CLUSTER
- command: get
args:
- $CLUSTER:1
- certs
- certs-$CLUSTER
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- sudo
- rm
- -rf
- certs
- command: put
args:
- $WORKLOAD_CLUSTER
- certs-$CLUSTER
- certs
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- chmod
- 600
- './certs/*'
- script: "pkg/cmd/drtprod/scripts/tpcc_init.sh"
args:
- cct_tpcc # suffix added to script name tpcc_init_cct_tpcc.sh
Expand Down
34 changes: 34 additions & 0 deletions pkg/cmd/drtprod/configs/drt_large.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,40 @@ targets:
- $WORKLOAD_CLUSTER
- workload
- script: "pkg/cmd/drtprod/scripts/setup_datadog_workload"
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- script: rm
args:
- -rf
- certs-$CLUSTER
- command: get
args:
- $CLUSTER:1
- certs
- certs-$CLUSTER
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- sudo
- rm
- -rf
- certs
- command: put
args:
- $WORKLOAD_CLUSTER
- certs-$CLUSTER
- certs
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- chmod
- 600
- './certs/*'
- script: "pkg/cmd/drtprod/scripts/tpcc_init.sh"
args:
- cct_tpcc # suffix added to script name tpcc_init_cct_tpcc.sh
Expand Down
21 changes: 20 additions & 1 deletion pkg/cmd/drtprod/configs/drt_scale.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,21 @@ targets:
- $CLUSTER
- --
- "sudo systemctl unmask cron.service ; sudo systemctl enable cron.service ; echo \"crontab -l ; echo '@reboot sleep 100 && ~/cockroach.sh' | crontab -\" > t.sh ; sh t.sh ; rm t.sh"
- command: sql
args:
- $CLUSTER:1
- --
- -e
- "ALTER RANGE timeseries CONFIGURE ZONE USING num_replicas=5,num_voters=5"
- command: sql
args:
- $CLUSTER:1
- --
- -e
- "ALTER RANGE default CONFIGURE ZONE USING num_replicas=5,num_voters=5"
# workload cluster specs
- target_name: $WORKLOAD_CLUSTER
steps:
- command: create
args:
- $WORKLOAD_CLUSTER
Expand Down Expand Up @@ -84,6 +98,11 @@ targets:
- $WORKLOAD_CLUSTER
- workload
- script: "pkg/cmd/drtprod/scripts/setup_datadog_workload"
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- script: rm
args:
- -rf
Expand All @@ -104,7 +123,7 @@ targets:
- --
- chmod
- 600
- certs/*
- './certs/*'
- command: put
args:
- $WORKLOAD_CLUSTER
Expand Down
22 changes: 22 additions & 0 deletions pkg/cmd/drtprod/configs/drt_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,25 @@ targets:
username: workload
on_rollback:
- command: destroy
- target_name: post_tasks
dependent_targets:
- $CLUSTER
- $WORKLOAD_CLUSTER
steps:
- command: get
args:
- $CLUSTER:1
- certs
- certs-$CLUSTER
- command: put
args:
- $WORKLOAD_CLUSTER
- certs-$CLUSTER
- certs
- command: ssh
args:
- $WORKLOAD_CLUSTER
- --
- chmod
- 600
- './certs/*'
Loading

0 comments on commit 2c50fda

Please sign in to comment.