Skip to content
This repository has been archived by the owner on Oct 9, 2024. It is now read-only.

General improvements #57

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion helm-chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ spec:
value: "{{ .Values.tests.caCertFile }}"
- name: TLS_CLIENT_CERT_FILE
value: "{{ .Values.tests.clientCertFile }}"
- name: TLS_CLIENT_CERT_PRIVATE_KEY_File
- name: TLS_CLIENT_CERT_PRIVATE_KEY_FILE
value: "{{ .Values.tests.clientCertPrivateKeyFile }}"
- name: TLS_CA_CERT_DATA
value: "{{ .Values.tests.caCertData }}"
Expand Down
7 changes: 4 additions & 3 deletions worker/bench/driver_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ package bench
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"golang.org/x/time/rate"
"time"
)

func (a *Activities) DriverActivity(ctx context.Context, request benchDriverActivityRequest) error {
Expand Down Expand Up @@ -71,7 +72,7 @@ func (d *benchDriver) run() error {
var completedIdx int
if err := activity.GetHeartbeatDetails(d.ctx, &completedIdx); err == nil {
idx = completedIdx + 1
d.logger.Info("resuming from failed attempt", "ReportedProgress", completedIdx)
d.logger.Warn("resuming from failed attempt", "ReportedProgress", completedIdx)
}
}

Expand Down Expand Up @@ -110,7 +111,7 @@ func (d *benchDriver) run() error {
}

func (d *benchDriver) execute(iterationID int) error {
d.logger.Info("driver.execute starting", "workflowName", d.request.WorkflowName, "basedID", d.request.BaseID, "iterationID", iterationID)
d.logger.Debug("driver.execute starting", "workflowName", d.request.WorkflowName, "basedID", d.request.BaseID, "iterationID", iterationID)
workflowID := fmt.Sprintf("%s-%s-%d", d.request.WorkflowName, d.request.BaseID, iterationID)
startOptions := client.StartWorkflowOptions{
ID: workflowID,
Expand Down
11 changes: 6 additions & 5 deletions worker/bench/monitor_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ package bench
import (
"context"
"fmt"
"strings"
"time"

"go.temporal.io/api/filter/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"strings"
"time"
)

func (a *Activities) MonitorActivity(ctx context.Context, request benchMonitorActivityRequest) ([]histogramValue, error) {
Expand Down Expand Up @@ -132,7 +133,7 @@ func (m *benchMonitor) validateScenarioCompletion(deadline time.Time) ([]workflo
}

func (m *benchMonitor) isComplete() (bool, error) {
m.logger.Info("IsComplete? enter")
m.logger.Debug("IsComplete? enter")
filterStartTime := m.request.StartTime.Add(-10 * time.Second)
ws, err := m.client.ListOpenWorkflow(m.ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{
MaximumPageSize: 1,
Expand All @@ -146,11 +147,11 @@ func (m *benchMonitor) isComplete() (bool, error) {
},
})
if err != nil {
m.logger.Info("IsComplete? exit", "error", err)
m.logger.Debug("IsComplete? exit", "error", err)
return false, err
}
done := len(ws.Executions) == 0
m.logger.Info(fmt.Sprintf("IsComplete? %t", done))
m.logger.Debug(fmt.Sprintf("IsComplete? %t", done))
return done, nil
}

Expand Down
5 changes: 3 additions & 2 deletions worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import (
)

func main() {
logger, err := zap.NewDevelopment()
logger, err := zap.NewProduction()
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -295,6 +295,7 @@ func getTLSConfig(hostPort string, logger *zap.Logger) (*tls.Config, error) {
clientCertFile := getEnvOrDefaultString(logger, "TLS_CLIENT_CERT_FILE", "")
clientCertPrivateKeyFile := getEnvOrDefaultString(logger, "TLS_CLIENT_CERT_PRIVATE_KEY_FILE", "")
enableHostVerification := getEnvOrDefaultBool(logger, "TLS_ENABLE_HOST_VERIFICATION", false)
serverName := getEnvOrDefaultString(logger, "TLS_SERVER_NAME", host)

caBytes, err := getTLSBytes(caCertFile, caCertData)
if err != nil {
Expand Down Expand Up @@ -333,7 +334,7 @@ func getTLSConfig(hostPort string, logger *zap.Logger) (*tls.Config, error) {
if caPool != nil || cert != nil {
tlsConfig := &tls.Config{
InsecureSkipVerify: !enableHostVerification,
ServerName: host,
ServerName: serverName,
}
if caPool != nil {
tlsConfig.RootCAs = caPool
Expand Down
6 changes: 3 additions & 3 deletions worker/target/basic/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package basic
import (
"context"
"time"

"go.temporal.io/sdk/activity"
)

Expand All @@ -38,10 +38,10 @@ type basicActivityRequest struct {
// Activity is the implementation for Basic Workflow
func Activity(ctx context.Context, req basicActivityRequest) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity: start", "Duration, ms", req.ActivityDelayMilliseconds)
logger.Debug("Activity: start", "Duration, ms", req.ActivityDelayMilliseconds)
if req.ActivityDelayMilliseconds > 0 {
time.Sleep(time.Duration(req.ActivityDelayMilliseconds) * time.Millisecond)
}
logger.Info("Activity: end")
logger.Debug("Activity: end")
return req.ResultPayload, nil
}
8 changes: 4 additions & 4 deletions worker/target/basic/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func Workflow(ctx workflow.Context, request workflowRequest) (string, error) {

logger := workflow.GetLogger(ctx)

logger.Info("basic workflow started", "activity task queue", taskQueue)
logger.Debug("basic workflow started", "activity task queue", taskQueue)

ao := workflow.ActivityOptions{
TaskQueue: taskQueue,
StartToCloseTimeout: time.Duration(request.ActivityDurationMilliseconds)*time.Millisecond + 10 * time.Minute,
StartToCloseTimeout: time.Duration(request.ActivityDurationMilliseconds)*time.Millisecond + 10*time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)

Expand Down Expand Up @@ -79,9 +79,9 @@ func Workflow(ctx workflow.Context, request workflowRequest) (string, error) {
allResults[i] = result
}

logger.Info("activity returned result to the workflow", "value", allResults)
logger.Debug("activity returned result to the workflow", "value", allResults)
}

logger.Info("basic workflow completed")
logger.Debug("basic workflow completed")
return request.ResultPayload, nil
}