Skip to content

Commit

Permalink
Merge pull request #98 from yashsinghcodes/fix-for-subflow-loop
Browse files Browse the repository at this point in the history
[fix]: subflow execution with .#1
  • Loading branch information
frikky authored Aug 20, 2024
2 parents bee2ee8 + 1ff194d commit bb5e77d
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 40 deletions.
14 changes: 7 additions & 7 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func IncrementCache(ctx context.Context, orgId, dataType string, amount ...int)

item := &gomemcache.Item{
Key: key,
Value: []byte(string(incrementAmount)),
Value: []byte(fmt.Sprintf("%d", incrementAmount)),
Expiration: 86400,
}

Expand All @@ -589,7 +589,7 @@ func IncrementCache(ctx context.Context, orgId, dataType string, amount ...int)
if item == nil || item.Value == nil {
item = &gomemcache.Item{
Key: key,
Value: []byte(string(incrementAmount)),
Value: []byte(fmt.Sprintf("%d", incrementAmount)),
Expiration: 86400,
}

Expand Down Expand Up @@ -7559,11 +7559,11 @@ func ListChildWorkflows(ctx context.Context, originalId string) ([]Workflow, err
filtered := []Workflow{}
handled := []string{}
for _, workflow := range workflows {
if ArrayContains(handled, string(workflow.Edited)) {
if ArrayContains(handled, fmt.Sprintf("%d", workflow.Edited)) {
continue
}

handled = append(handled, string(workflow.Edited))
handled = append(handled, fmt.Sprintf("%d", workflow.Edited))
filtered = append(filtered, workflow)
}

Expand Down Expand Up @@ -7729,11 +7729,11 @@ func ListWorkflowRevisions(ctx context.Context, originalId string) ([]Workflow,
filtered := []Workflow{}
handled := []string{}
for _, workflow := range workflows {
if ArrayContains(handled, string(workflow.Edited)) {
if ArrayContains(handled, fmt.Sprintf("%d", workflow.Edited)) {
continue
}

handled = append(handled, string(workflow.Edited))
handled = append(handled, fmt.Sprintf("%d", workflow.Edited))
filtered = append(filtered, workflow)
}

Expand Down Expand Up @@ -12679,7 +12679,7 @@ func ValidateFinished(ctx context.Context, extra int, workflowExecution Workflow
err := CreateOrgNotification(
ctx,
fmt.Sprintf("Workflow %s took too long to run. Time taken: %d seconds", workflowExecution.Workflow.Name, comparisonTime),
fmt.Sprintf("This notification is made when the execution takes more than 10 minutes.", workflowExecution.Workflow.Name, comparisonTime),
fmt.Sprintf("This notification is made when the execution takes more than 10 minutes."),
fmt.Sprintf("/workflows/%s?execution_id=%s&view=executions", workflowExecution.Workflow.ID, workflowExecution.ExecutionId),
workflowExecution.ExecutionOrg,
true,
Expand Down
2 changes: 1 addition & 1 deletion files.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ func HandleEditFile(resp http.ResponseWriter, request *http.Request) {

body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Println("[ERROR] Failed reading file body: %s", err)
log.Printf("[ERROR] Failed reading file body: %s", err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed to read data"}`)))
return
Expand Down
4 changes: 2 additions & 2 deletions notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func CreateOrgNotification(ctx context.Context, title, description, referenceUrl
}

if workflow.OrgId != mainNotification.OrgId {
log.Printf("[WARNING] Can't access workflow %s with org ID %s (%s): %s", workflow.ID, mainNotification.OrgId, workflow.Org)
log.Printf("[WARNING] Can't access workflow %s with org %s (%s): %s", workflow.ID, mainNotification.OrgName, mainNotification.OrgId, workflow.Org)

// Get parent org if it exists and check too
if len(org.ManagerOrgs) > 0 {
Expand Down Expand Up @@ -927,7 +927,7 @@ func HandleCreateNotification(resp http.ResponseWriter, request *http.Request) {

// Allows for execution authorization
if len(notification.ExecutionId) == 0 {
log.Printf("[INFO] User tried to create notification for execution %s with org id %s, but notification org id is %s", notification.ExecutionId, notification.OrgId)
log.Printf("[INFO] User tried to create notification for execution %s with org id %s, but notification org id is %s", notification.ExecutionId, orgId,notification.OrgId)
resp.WriteHeader(403)
resp.Write([]byte(`{"success": false}`))
return
Expand Down
10 changes: 5 additions & 5 deletions oauth2.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func GetOutlookBody(ctx context.Context, hook Hook, body []byte) string {

body, err = json.Marshal(parsedEmail)
if err != nil {
log.Println("[WARNING] Failed to marshal data from emails: %s", err)
log.Printf("[WARNING] Failed to marshal data from emails: %s", err)
return string(body)
}

Expand Down Expand Up @@ -481,7 +481,7 @@ func HandleNewGithubRegister(resp http.ResponseWriter, request *http.Request) {

_, err = HandleAlgoliaCreatorUpload(ctx, user, false, false)
if err != nil {
log.Printf("[ERROR] Failed making user %s' information public")
log.Printf("[ERROR] Failed making user %s' information public", user.Username)
}

log.Printf("Successful client setup for github?")
Expand Down Expand Up @@ -2559,7 +2559,7 @@ func handleIndividualEmailUploads(ctx context.Context, gmailClient *http.Client,
mail.Type = "new"
mappedData, err := json.Marshal(mail)
if err != nil {
log.Println("[WARNING] Failed to Marshal mail to send to webhook: %s", err)
log.Printf("[WARNING] Failed to Marshal mail to send to webhook: %s", err)
return message, err
}

Expand Down Expand Up @@ -2911,7 +2911,7 @@ func HandleGmailRouting(resp http.ResponseWriter, request *http.Request) {
mail.Type = "thread"
mappedData, err := json.Marshal(mail)
if err != nil {
log.Println("[WARNING] Failed to Marshal mail to send to webhook: %s (2)", err)
log.Printf("[WARNING] Failed to Marshal mail to send to webhook: %s (2)", err)
resp.WriteHeader(401)
continue
}
Expand Down Expand Up @@ -3665,7 +3665,7 @@ func GetOauth2ApplicationPermissionToken(ctx context.Context, user User, appAuth
}

if strings.Contains(string(body), "error") {
log.Printf("\n\n[ERROR] Oauth2 app RESPONSE: %s\n\nencoded: %#v", string(body))
log.Printf("\n\n[ERROR] Oauth2 app RESPONSE: %s\n\n", string(body))
}

// Parse out data like {"access_token":"ddpGSlBV4GhNhToPTLjHZSwbqRH6JUIv0QYPo6CW62NfAr","token_type":"Bearer","expires_in":1870}
Expand Down
61 changes: 37 additions & 24 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ func randStr(strSize int, randType string) string {
return string(bytes)
}

func isLoop(arg string) bool {
if strings.Contains(arg, "$") && (strings.HasSuffix(arg, ".#") || strings.Contains(arg, ".#.")){
return true
}

if strings.Contains(arg,"$") && strings.Contains(arg, ".#") {
pattern := `(^|\.)(#(\d+-\d+)?($|\.))`
re := regexp.MustCompile(pattern)
return strings.Contains(arg, "$") && re.MatchString(arg)
}
return false
}

func HandleSet2fa(resp http.ResponseWriter, request *http.Request) {
cors := HandleCors(resp, request)
if cors {
Expand Down Expand Up @@ -1843,7 +1856,7 @@ func AddAppAuthentication(resp http.ResponseWriter, request *http.Request) {
org.ActiveApps = append(org.ActiveApps, app.ID)
err = SetOrg(ctx, *org, org.Id)
if err != nil {
log.Printf("[WARNING] Failed setting app %s for org %s during appauth", org.Id)
log.Printf("[WARNING] Failed setting app %s for org %s during appauth", app.ID, org.Id)
} else {
DeleteCache(ctx, fmt.Sprintf("apps_%s", user.Id))
DeleteCache(ctx, fmt.Sprintf("apps_%s", user.ActiveOrg.Id))
Expand Down Expand Up @@ -3741,7 +3754,7 @@ func GetAction(workflowExecution WorkflowExecution, id, environment string) Acti
Label: trigger.Label,
ExecutionDelay: trigger.ExecutionDelay,
}
log.Printf("[DEBUG] Found trigger to be ran as app (?): %s!", trigger)
log.Printf("[DEBUG] Found trigger to be ran as app (?): %v!", trigger)
}
}

Expand Down Expand Up @@ -7246,7 +7259,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
if env.Name == action.Environment {
found = true
if env.Archived {
log.Printf("[DEBUG] Environment %s is archived. Changing to default.")
log.Printf("[DEBUG] Environment %s is archived. Changing to default.", env.Name)
action.Environment = defaultEnv
}

Expand Down Expand Up @@ -9935,7 +9948,7 @@ func DeleteUser(resp http.ResponseWriter, request *http.Request) {
log.Printf("[ERROR] Failed getting org '%s' in delete user: %s", foundUser.ActiveOrg.Id, err)
} else {
if foundUserOrg.SSOConfig.SSORequired && !ArrayContains(foundUser.ValidatedSessionOrgs, foundUserOrg.Id) {
log.Printf("[AUDIT] User %s (%s) does not have an active session in org with forced SSO %s, so forcing a re-login (aka logout).", foundUser.ActiveOrg.Id)
log.Printf("[AUDIT] User %s (%s) does not have an active session in org with forced SSO %s, so forcing a re-login (aka logout).", foundUser.Username, foundUser.Id, foundUser.ActiveOrg.Id)
foundUser.Session = ""
foundUser.ValidatedSessionOrgs = []string{}
}
Expand Down Expand Up @@ -10245,7 +10258,7 @@ func UpdateWorkflowAppConfig(resp http.ResponseWriter, request *http.Request) {
}

if tmpfields.Sharing != app.Sharing {
log.Printf("[INFO] Changing app sharing for %s to %s", app.ID, tmpfields.Sharing)
log.Printf("[INFO] Changing app sharing for %s to %t", app.ID, tmpfields.Sharing)
app.Sharing = tmpfields.Sharing

if project.Environment != "cloud" {
Expand Down Expand Up @@ -10410,7 +10423,7 @@ func DeleteWorkflowApp(resp http.ResponseWriter, request *http.Request) {
}

if (app.Public || app.Sharing) && project.Environment == "cloud" {
log.Printf("[WARNING] App %s being deleted is public. Shouldn't be allowed. Public: %s, Sharing: %s", app.Name, app.Public, app.Sharing)
log.Printf("[WARNING] App %s being deleted is public. Shouldn't be allowed. Public: %t, Sharing: %t", app.Name, app.Public, app.Sharing)

resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "reason": "Can't delete public apps. Stop sharing it first, then delete it."}`))
Expand Down Expand Up @@ -10549,7 +10562,7 @@ func HandleKeyValueCheck(resp http.ResponseWriter, request *http.Request) {
}

if workflowExecution.Status != "EXECUTING" {
log.Printf("[INFO] Workflow isn't executing and shouldn't be searching", workflowExecution.ExecutionId)
log.Printf("[INFO] Workflow (%s) isn't executing and shouldn't be searching", workflowExecution.ExecutionId)
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false, "Workflow isn't executing"}`))
return
Expand Down Expand Up @@ -12662,7 +12675,7 @@ func GetWorkflowAppConfig(resp http.ResponseWriter, request *http.Request) {
parentapp, err := GetApp(ctx, app.PublishedId, user, false)
if err == nil {
if parentapp.Owner == user.Id {
log.Printf("[AUDIT] Parent app owner %s got access to child app %s (%s)", user.Username, user.Id, app.Name, app.ID)
log.Printf("[AUDIT] Parent app owner %s (%s) got access to child app %s (%s)", user.Username, user.Id, app.Name, app.ID)
exit = false
//app, err := GetApp(ctx, fileId, User{}, false)
}
Expand Down Expand Up @@ -13749,7 +13762,7 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa

if newresp.StatusCode != 200 {
log.Printf("[ERROR] Bad statuscode setting subresult (1) with URL %s: %d, %s. Input data: %s", resultUrl, newresp.StatusCode, string(body), string(data))
return errors.New(fmt.Sprintf("Bad statuscode: %s", newresp.StatusCode))
return errors.New(fmt.Sprintf("Bad statuscode: %d", newresp.StatusCode))
}

err = json.Unmarshal(body, &newExecution)
Expand Down Expand Up @@ -13779,7 +13792,7 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa

selectedTrigger = trigger
for _, param := range trigger.Parameters {
if param.Name == "argument" && strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
if param.Name == "argument" && isLoop(param.Value) {
// Check if the .# exists, without .#0 or .#1 for digits
log.Printf("\n\n[DEBUG] IN LOOP CHECK RESULT\n\n")
//re := regexp.MustCompile(`\.\#(\d+)`)
Expand Down Expand Up @@ -13818,7 +13831,7 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa
foundResult.Action = action

for _, param := range action.Parameters {
if param.Name == "argument" && strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
if param.Name == "argument" && isLoop(param.Value) {
isLooping = true
}

Expand Down Expand Up @@ -14093,13 +14106,13 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa
)

if err != nil {
log.Printf("[ERROR] Error building subflow request: %s", subflowExecutionId, err)
log.Printf("[ERROR] Error building subflow (%s) request: %s", subflowExecutionId, err)
return err
}

newresp, err := topClient.Do(req)
if err != nil {
log.Printf("[ERROR] Error running subflow request: %s", subflowExecutionId, err)
log.Printf("[ERROR] Error running subflow (%s) request: %s", subflowExecutionId, err)
return err
}

Expand Down Expand Up @@ -14339,7 +14352,7 @@ func RunExecutionTranslation(ctx context.Context, actionResult ActionResult) {

parsedStatus := status.(float64)
if parsedStatus >= 300 {
log.Printf("[DEBUG] Found status in action result: %d", parsedStatus)
log.Printf("[DEBUG] Found status in action result: %f", parsedStatus)
return
}
} else {
Expand Down Expand Up @@ -15201,7 +15214,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
// Sets the value for the variable

if len(actionResult.Result) > 0 {
log.Printf("\n\n[DEBUG] SET EXEC VAR\n\n", execvar.Name)
log.Printf("\n\n[DEBUG] SET EXEC VAR %s\n\n", execvar.Name)
workflowExecution.ExecutionVariables[index].Value = actionResult.Result
} else {
log.Printf("\n\n[DEBUG] SKIPPING EXEC VAR\n\n")
Expand Down Expand Up @@ -16856,7 +16869,7 @@ func HandleDeleteCacheKey(resp http.ResponseWriter, request *http.Request) {

cacheKey, err = url.QueryUnescape(strings.Trim(cacheKey, " "))
if err != nil {
log.Printf("[WARNING] Failed to unescape cache key %s: %s", err)
log.Printf("[WARNING] Failed to unescape cache key %s: %s", cacheKey,err)
cacheKey = strings.Trim(cacheKey, " ")
}

Expand Down Expand Up @@ -18282,7 +18295,7 @@ func RunFixParentWorkflowResult(ctx context.Context, execution WorkflowExecution
setExecution = false
}

if param.Name == "argument" && strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
if param.Name == "argument" && isLoop(param.Value) {
isLooping = true
}

Expand Down Expand Up @@ -19567,7 +19580,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
//$Get_Offenses.# -> Allow to run more
for _, param := range trigger.Parameters {
if param.Name == "argument" {
if strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
if isLoop(param.Value) {
allowContinuation = true
break
}
Expand All @@ -19591,7 +19604,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
//$Get_Offenses.# -> Allow to run more
for _, param := range action.Parameters {
if param.Name == "argument" {
if strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
if isLoop(param.Value) {
allowContinuation = true
break
}
Expand Down Expand Up @@ -19908,7 +19921,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
if strings.ToLower(result.Action.Environment) != "cloud" {
log.Printf("[DEBUG][%s] SETTING user input result, and re-adding it to queue IF not in worker. Environment: %s", result.ExecutionId, result.Action.Environment)
if project.Environment == "worker" {
log.Printf("\n\n[DEBUG][%s] Worker user input restart. What do? Should we ever reach this point?\n\n")
log.Printf("\n\n[DEBUG][%s] Worker user input restart. What do? Should we ever reach this point?\n\n", project.Environment)
} else {

updateMade := true
Expand Down Expand Up @@ -19983,19 +19996,19 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h

if err != nil {
log.Printf("[ERROR] Failed creating request for stream during SKIPPED user input: %s", err)
return WorkflowExecution{}, ExecInfo{}, fmt.Sprintf("Execution action failed to skip. Contact support if this persists.", oldExecution.ExecutionId), errors.New("Execution action failed to skip. Contact support if this persists.")
return WorkflowExecution{}, ExecInfo{}, fmt.Sprintf("Execution (%s) action failed to skip. Contact support if this persists.", oldExecution.ExecutionId), errors.New("Execution action failed to skip. Contact support if this persists.")
}

newresp, err := topClient.Do(req)
if err != nil {
log.Printf("[ERROR] Failed sending request for stream during SKIPPED user input: %s", err)
return WorkflowExecution{}, ExecInfo{}, fmt.Sprintf("Execution action failed to skip during send. Contact support if this persists.", oldExecution.ExecutionId), errors.New("Execution action failed to skip during send. Contact support if this persists.")
return WorkflowExecution{}, ExecInfo{}, fmt.Sprintf("Execution (%s) action failed to skip during send. Contact support if this persists.", oldExecution.ExecutionId), errors.New("Execution action failed to skip during send. Contact support if this persists.")
}

defer newresp.Body.Close()
}

return WorkflowExecution{}, ExecInfo{}, fmt.Sprintf("Execution action skipped", oldExecution.ExecutionId), errors.New("User Input: Execution action skipped!")
return WorkflowExecution{}, ExecInfo{}, fmt.Sprintf("Execution (%s) action skipped", oldExecution.ExecutionId), errors.New("User Input: Execution action skipped!")
}

foundresult = result
Expand Down Expand Up @@ -24265,7 +24278,7 @@ func RunCategoryAction(resp http.ResponseWriter, request *http.Request) {
value.Fields = newFields

// Md5 based on sortedKeys. Could subhash key search work?
mappedString := fmt.Sprintf("%s-%s", selectedApp.ID, value.Label, strings.Join(sortedKeys, ""))
mappedString := fmt.Sprintf("%s %s-%s", selectedApp.ID, value.Label, strings.Join(sortedKeys, ""))
fieldHash = fmt.Sprintf("%x", md5.Sum([]byte(mappedString)))
discoverFile := fmt.Sprintf("file_%s", fieldHash)
file, err := GetFile(ctx, discoverFile)
Expand Down
28 changes: 28 additions & 0 deletions shared_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package shuffle

import (
"testing"
)

func TestIsLoop(t *testing.T) {
handlers := []struct {
arg string
expected bool
}{
{"$exec.#1-2", true},
{"$exec.#.value.#1", true},
{"$exec.#1", false},
{"$exec", false},
{"$exec.#1.value.#2", false},
{"$start_node.#", true},
{"\n$Change_Me\n.#3.value\n", false},
{"\n\n\n\n$Change_Me\n\n.\n#\n.\n\nvalue\n\n\n", true},
}

for _, tt := range handlers {
result := isLoop(tt.arg)
if result != tt.expected {
t.Errorf("isLoop(%s) = %v; expected %v", tt.arg, result, tt.expected)
}
}
}
2 changes: 1 addition & 1 deletion streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func HandleStreamWorkflow(resp http.ResponseWriter, request *http.Request) {

conn, ok := resp.(http.Flusher)
if !ok {
log.Printf("[ERROR] Flusher error: %s", ok)
log.Printf("[ERROR] Flusher error: %t", ok)
http.Error(resp, "Streaming supported on AppEngine", http.StatusInternalServerError)
return
}
Expand Down

0 comments on commit bb5e77d

Please sign in to comment.