Skip to content

Commit

Permalink
Added 'form' reads and executes of workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Oct 4, 2024
1 parent 5a30d86 commit 760b2f7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 55 deletions.
120 changes: 66 additions & 54 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -7811,7 +7811,6 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
// Nodechecks
foundNodes := []string{}

log.Printf("[DEBUG] all nodes: %d", len(allNodes))
for _, node := range allNodes {
for _, branch := range workflow.Branches {
if node == branch.DestinationID || node == branch.SourceID {
Expand All @@ -7823,18 +7822,6 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

// Making sure to check if infinite loops exist
// This is now done during execution. Should be done on frontend & backend too
/*
for _, action := range workflow.Actions {
//log.Printf("[INFO] Checking childnodes for action %s (%s)", action.Label, action.ID)
childNodes := FindChildNodes(WorkflowExecution{Workflow: workflow}, action.ID, []string{}, []string{})
log.Printf("[INFO] Found %d childnodes for action %s (%s)", len(childNodes), action.Label, action.ID)
}
*/

// FIXME - append all nodes (actions, triggers etc) to one single array here
//log.Printf("PRE VARIABLES")
if len(foundNodes) != len(allNodes) || len(workflow.Actions) <= 0 {
// This shit takes a few seconds lol
if !workflow.IsValid {
Expand Down Expand Up @@ -9640,7 +9627,7 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

// Check workflow.Sharing == private / public / org too
// Check the URL source path to include /form or /run
isOwner := false
if user.Id != workflow.Owner || len(user.Id) == 0 {
// Added org-reader as the user should be able to read everything in an org
Expand All @@ -9657,6 +9644,26 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
log.Printf("[AUDIT] Letting verified support admin %s access workflow %s", user.Username, workflow.ID)

isOwner = true

} else if workflow.Sharing == "form" {
log.Printf("[AUDIT] Letting user %s access workflow %s because it's a form. Sanitized format.", user.Username, workflow.ID)

// Execute-Only. No executions or impersonations.

// Remaking the workflow intirely to ONLY include relevant stuff, and be future-proof
//user.ActiveOrg.Id = workflow.OrgId

workflow = &Workflow{
Name: workflow.Name,
ID: workflow.ID,
Owner: workflow.Owner,
OrgId: workflow.OrgId,

Sharing: workflow.Sharing,
Description: workflow.Description,
InputQuestions: workflow.InputQuestions,
InputMarkdown: workflow.InputMarkdown,
}
} else {
log.Printf("[AUDIT] Wrong user %s (%s) for workflow '%s' (get workflow). Verified: %t, Active: %t, SupportAccess: %t, Username: %s", user.Username, user.Id, workflow.ID, user.Verified, user.Active, user.SupportAccess, user.Username)
resp.WriteHeader(401)
Expand Down Expand Up @@ -9715,7 +9722,7 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
}

// Never load without a node
if len(workflow.Actions) == 0 {
if len(workflow.Actions) == 0 && workflow.Sharing != "form" {
// Append
nodeId := uuid.NewV4().String()
workflow.Start = nodeId
Expand Down Expand Up @@ -9767,12 +9774,15 @@ func GetSpecificWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

workflowapps, err := GetPrioritizedApps(ctx, user)
if err != nil {
log.Printf("[WARNING] Error: Failed getting workflowapps: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
workflowapps := []WorkflowApp{}
if len(user.Id) > 0 && len(user.ActiveOrg.Id) > 0 {
workflowapps, err = GetPrioritizedApps(ctx, user)
if err != nil {
log.Printf("[WARNING] Error: Failed getting workflowapps: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}
}

// Handle app versions & upgrades
Expand Down Expand Up @@ -9988,7 +9998,6 @@ func DeleteUser(resp http.ResponseWriter, request *http.Request) {
}

// log.Printf("user active organization is %v: ", userInfo)

org, err := GetOrg(ctx, userInfo.ActiveOrg.Id)
if err != nil {
log.Printf("[ERROR] Failed getting org '%s' in delete user: %s", userInfo.ActiveOrg.Id, err)
Expand Down Expand Up @@ -11896,7 +11905,6 @@ func sendMailSendgrid(toEmail []string, subject, body string, emailApp bool, Bcc
}
}


parsedBody, err := json.Marshal(newBody)
if err != nil {
log.Printf("[ERROR] Failed to parse JSON in sendmail: %s", err)
Expand Down Expand Up @@ -15896,7 +15904,7 @@ func compressExecution(ctx context.Context, workflowExecution WorkflowExecution,
return workflowExecution, dbSave
}

// Recursively finds the child nodes of a node in execution and returns their ID.
// Recursively finds the child nodes of a node in execution and returns their ID.
// Used if e.g. a node in a branch is exited, and all children have to be stopped
// Also used during startup of a workflow to set all nodes to be SKIPPED that aren't in immediate use
func FindChildNodes(workflow Workflow, nodeId string, parents, handledBranches []string) []string {
Expand Down Expand Up @@ -20297,7 +20305,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
// Don't override workflow defaults
}

log.Printf("[DEBUG][%s] STARTING IF/ELSE NODE REMAPPING", workflowExecution.ExecutionId)
//log.Printf("[DEBUG][%s] STARTING IF/ELSE NODE REMAPPING", workflowExecution.ExecutionId)
for branchIndex, branch := range workflowExecution.Workflow.Branches {
if len(branch.SourceParent) == 0 {
continue
Expand Down Expand Up @@ -20336,24 +20344,24 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
Value: actionLabelParsed,

ActionField: "",
ID: uuid.NewV4().String(),
Name: "source",
Variant: "STATIC_VALUE",
ID: uuid.NewV4().String(),
Name: "source",
Variant: "STATIC_VALUE",
},
Condition: WorkflowAppActionParameter{
Value: "equals",

ID: uuid.NewV4().String(),
Name: "condition",
ID: uuid.NewV4().String(),
Name: "condition",
Variant: "STATIC_VALUE",
},
Destination: WorkflowAppActionParameter{
Value: "true",

ActionField: "",
ID: uuid.NewV4().String(),
Name: "destination",
Variant: "STATIC_VALUE",
ID: uuid.NewV4().String(),
Name: "destination",
Variant: "STATIC_VALUE",
},
}

Expand All @@ -20368,7 +20376,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
//elseCondition = append(elseCondition, newCondition)
}

log.Printf("[DEBUG][%s] ENDING IF/ELSE NODE REMAPPING", workflowExecution.ExecutionId)
//log.Printf("[DEBUG][%s] ENDING IF/ELSE NODE REMAPPING", workflowExecution.ExecutionId)

if workflowExecution.SubExecutionCount == 0 {
workflowExecution.SubExecutionCount = 1
Expand Down Expand Up @@ -21402,7 +21410,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
//log.Printf("\n\n\n[DEBUG][%s] STARTUP NODES (%d): %#v. Total actions: %#v\n\n\n", workflowExecution.ExecutionId, len(childNodes), childNodes, len(workflowExecution.Workflow.Actions))

for _, action := range workflowExecution.Workflow.Actions {
if action.ID == workflowExecution.Start {
if action.ID == workflowExecution.Start {
continue
}

Expand All @@ -21411,14 +21419,14 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
}

foundResult := false
for _, result := range workflowExecution.Results {
for _, result := range defaultResults {
if result.Action.ID == action.ID {
foundResult = true
break
}
}

if !foundResult {
if !foundResult {
defaultResults = append(defaultResults, ActionResult{
Action: action,
ExecutionId: workflowExecution.ExecutionId,
Expand Down Expand Up @@ -21687,7 +21695,6 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
}
}


if len(org.Defaults.KmsId) > 0 {
if len(allAuths) == 0 {
allAuths, err = GetAllWorkflowAppAuth(ctx, workflow.ExecutingOrg.Id)
Expand Down Expand Up @@ -21998,7 +22005,7 @@ func RunExecuteAccessValidation(request *http.Request, workflow *Workflow) (bool
if !sourceExecutionOk {
sourceExecution, sourceExecutionOk = request.URL.Query()["reference_execution"]
if !sourceExecutionOk {
log.Printf("[AUDIT] No source_execution or reference_execution in access validation")
//log.Printf("[AUDIT] No source_execution or reference_execution in access validation")
return false, ""
}
}
Expand Down Expand Up @@ -23966,7 +23973,6 @@ func GetExternalClient(baseUrl string) *http.Client {
httpProxy := os.Getenv("SHUFFLE_INTERNAL_HTTP_PROXY")
httpsProxy := os.Getenv("SHUFFLE_INTERNAL_HTTPS_PROXY")


transport := http.DefaultTransport.(*http.Transport)
transport.MaxIdleConnsPerHost = 100
transport.ResponseHeaderTimeout = time.Second * 60
Expand Down Expand Up @@ -24000,7 +24006,6 @@ func GetExternalClient(baseUrl string) *http.Client {
log.Printf("[INFO] Reading self signed certificates from custom dir '%s'", certDir)
}


files, err := os.ReadDir(certDir)
if err == nil && os.Getenv("SHUFFLE_CERT_DIR") != "" {
for _, file := range files {
Expand All @@ -24023,7 +24028,7 @@ func GetExternalClient(baseUrl string) *http.Client {

if (len(httpProxy) > 0 || len(httpsProxy) > 0) && baseUrl != "http://shuffle-backend:5001" {
//client = &http.Client{}
if len(httpProxy) > 0 && httpProxy != "noproxy"{
if len(httpProxy) > 0 && httpProxy != "noproxy" {
log.Printf("[INFO] Running with HTTP proxy %s (env: HTTP_PROXY)", httpProxy)

url_i := url.URL{}
Expand All @@ -24032,7 +24037,7 @@ func GetExternalClient(baseUrl string) *http.Client {
transport.Proxy = http.ProxyURL(url_proxy)
}
}
if len(httpsProxy) > 0 && httpsProxy != "noproxy"{
if len(httpsProxy) > 0 && httpsProxy != "noproxy" {
log.Printf("[INFO] Running with HTTPS proxy %s (env: HTTPS_PROXY)", httpsProxy)

url_i := url.URL{}
Expand All @@ -24042,7 +24047,7 @@ func GetExternalClient(baseUrl string) *http.Client {
}
}
} else {
// keeping this here for now
// keeping this here for now
if len(httpProxy) > 0 && httpProxy != "noproxy" {
log.Printf("[INFO] Running with HTTP proxy %s (env: HTTP_PROXY)", httpProxy)

Expand Down Expand Up @@ -27896,17 +27901,28 @@ func parseSubflowResults(ctx context.Context, result ActionResult) (ActionResult
return result, true
}

func ValidateRequestOverload(resp http.ResponseWriter, request *http.Request) error {
func ValidateRequestOverload(resp http.ResponseWriter, request *http.Request, amount ...int) error {
// 1. Get current amount of requests for the user
// 2. Check if the user is allowed to make more requests
// 3. If not, return error
// 4. If yes, continue and add the request to the list
// Use the GetCache() and SetCache() functions to store the request count

// Max amount per minute
maxAmount := 4
if len(amount) > 0 {
maxAmount = amount[0]
}

// Max amount per minute
foundIP := GetRequestIp(request)
if foundIP == "" || foundIP == "127.0.0.1" || foundIP == "::1" {

portRemoval := strings.Split(foundIP, ":")
if len(portRemoval) > 1 {
foundIP = strings.Join(portRemoval[:len(portRemoval)-1], ":")
}

//log.Printf("\n\n\nIP: %s\n\n\n", foundIP)
if foundIP == "" || foundIP == "127.0.0.1" || foundIP == "::1" || foundIP == "[::1]" {
log.Printf("[DEBUG] Skipping request overload check for IP: %s", foundIP)
return nil
}
Expand Down Expand Up @@ -27968,7 +27984,7 @@ func ValidateRequestOverload(resp http.ResponseWriter, request *http.Request) er
newList = append(newList, req)
}

if len(newList) > maxAmount {
if len(newList) >= maxAmount {
// FIXME: Should we add to the list even if we return an error?

return errors.New("Too many requests")
Expand Down Expand Up @@ -28406,10 +28422,8 @@ func HandleExecutionCacheIncrement(ctx context.Context, execution WorkflowExecut
}
}


// FIXME: Always fails:


func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
if cors {
Expand Down Expand Up @@ -28474,7 +28488,7 @@ func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
continue
}

org, err := GetOrg(ctx, orgId)
org, err := GetOrg(ctx, orgId)
if err != nil {
log.Printf("[WARNING] Failed getting org during parent org loading %s: %s", org.Id, err)
resp.WriteHeader(500)
Expand All @@ -28486,7 +28500,7 @@ func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
if user.Id == orgUser.Id {
user.Role = orgUser.Role
user.ActiveOrg.Id = org.Id
orgUserFound = true
orgUserFound = true
}
}

Expand Down Expand Up @@ -28520,8 +28534,6 @@ func GetChildWorkflows(resp http.ResponseWriter, request *http.Request) {
}
}



// Access is granted -> get revisions
childWorkflows, err := ListChildWorkflows(ctx, workflow.ID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ type Workflow struct {
Description string `json:"description" datastore:"description,noindex"`
Start string `json:"start" datastore:"start"`
Owner string `json:"owner" datastore:"owner"`
Sharing string `json:"sharing" datastore:"sharing"`
Sharing string `json:"sharing" datastore:"sharing"` // Not really used outside of Forms.
Image string `json:"image,omitempty" datastore:"image,noindex"`
Org []OrgMini `json:"org,omitempty" datastore:"org"`
ExecutingOrg OrgMini `json:"execution_org,omitempty" datastore:"execution_org"`
Expand Down

0 comments on commit 760b2f7

Please sign in to comment.