diff --git a/src/dflow/argo_objects.py b/src/dflow/argo_objects.py index c847ceb6..4be771be 100644 --- a/src/dflow/argo_objects.py +++ b/src/dflow/argo_objects.py @@ -310,6 +310,12 @@ def get_duration(self) -> datetime.timedelta: max_k8s_resource_name_length = 253 k8s_naming_hash_length = 10 +FNV_32_PRIME = 0x01000193 +FNV1_32_INIT = 0x811c9dc5 + + +def get_hash(node_name): + return fnva(node_name.encode(), FNV1_32_INIT, FNV_32_PRIME, 2**32) def get_pod_name(wf_name, node_name, template_name, node_id): @@ -319,14 +325,10 @@ def get_pod_name(wf_name, node_name, template_name, node_id): max_prefix_length = max_k8s_resource_name_length - k8s_naming_hash_length if len(prefix) > max_prefix_length - 1: prefix = prefix[:max_prefix_length-1] - hash_val = fnva(node_name.encode(), FNV1_32_INIT, FNV_32_PRIME, 2**32) + hash_val = get_hash(node_name) return "%s-%s" % (prefix, hash_val) -FNV_32_PRIME = 0x01000193 -FNV1_32_INIT = 0x811c9dc5 - - def fnva(data, hval_init, fnv_prime, fnv_size): """ Alternative FNV hash algorithm used in FNV-1a. diff --git a/src/dflow/workflow.py b/src/dflow/workflow.py index 9b3bb45b..36da2c40 100644 --- a/src/dflow/workflow.py +++ b/src/dflow/workflow.py @@ -6,7 +6,7 @@ from copy import deepcopy from typing import Any, Dict, List, Optional, Union -from .argo_objects import ArgoStep, ArgoWorkflow +from .argo_objects import ArgoStep, ArgoWorkflow, get_hash from .common import jsonpickle, subdomain_errmsg, subdomain_regex from .config import config, s3_config from .context import Context @@ -492,7 +492,10 @@ def convert_to_argo(self, reuse_step=None): data = {} if step.key is None: continue - key2id[step.key] = step.id + node_name = self.id + step.name[len(step.workflow):] + hash_val = get_hash(node_name) + new_id = "%s-%s" % (self.id, hash_val) + key2id[step.key] = new_id self.handle_reused_step(step, global_parameters, global_artifacts) @@ -1109,9 +1112,11 @@ def query_step_by_key( workflow = self.query( fields=['metadata.name'] + [ 'status.nodes.' + key2id[k] for k in key]) - return workflow.get_step(name=name, phase=phase, id=id, type=type) + steps = workflow.get_step(name=name, phase=phase, id=id, type=type) + assert len(steps) > 0 + return steps except Exception: - logger.warning("Key-ID map not found in the global outputs, " + logger.warning("Key(s) not found in the global outputs, " "downgrade to full query") return self.query_step(key=key, name=name, phase=phase, id=id, type=type)