Skip to content

Commit

Permalink
Merge pull request #794 from deepmodeling/zjgemi
Browse files Browse the repository at this point in the history
fix: update key-ID map in global outputs according to the new workflo…
  • Loading branch information
zjgemi authored Apr 12, 2024
2 parents d800dbf + cf2156b commit 1026c4b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
12 changes: 7 additions & 5 deletions src/dflow/argo_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ def get_duration(self) -> datetime.timedelta:

max_k8s_resource_name_length = 253
k8s_naming_hash_length = 10
FNV_32_PRIME = 0x01000193
FNV1_32_INIT = 0x811c9dc5


def get_hash(node_name):
return fnva(node_name.encode(), FNV1_32_INIT, FNV_32_PRIME, 2**32)


def get_pod_name(wf_name, node_name, template_name, node_id):
Expand All @@ -319,14 +325,10 @@ def get_pod_name(wf_name, node_name, template_name, node_id):
max_prefix_length = max_k8s_resource_name_length - k8s_naming_hash_length
if len(prefix) > max_prefix_length - 1:
prefix = prefix[:max_prefix_length-1]
hash_val = fnva(node_name.encode(), FNV1_32_INIT, FNV_32_PRIME, 2**32)
hash_val = get_hash(node_name)
return "%s-%s" % (prefix, hash_val)


FNV_32_PRIME = 0x01000193
FNV1_32_INIT = 0x811c9dc5


def fnva(data, hval_init, fnv_prime, fnv_size):
"""
Alternative FNV hash algorithm used in FNV-1a.
Expand Down
13 changes: 9 additions & 4 deletions src/dflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from copy import deepcopy
from typing import Any, Dict, List, Optional, Union

from .argo_objects import ArgoStep, ArgoWorkflow
from .argo_objects import ArgoStep, ArgoWorkflow, get_hash
from .common import jsonpickle, subdomain_errmsg, subdomain_regex
from .config import config, s3_config
from .context import Context
Expand Down Expand Up @@ -492,7 +492,10 @@ def convert_to_argo(self, reuse_step=None):
data = {}
if step.key is None:
continue
key2id[step.key] = step.id
node_name = self.id + step.name[len(step.workflow):]
hash_val = get_hash(node_name)
new_id = "%s-%s" % (self.id, hash_val)
key2id[step.key] = new_id
self.handle_reused_step(step, global_parameters,
global_artifacts)

Expand Down Expand Up @@ -1109,9 +1112,11 @@ def query_step_by_key(
workflow = self.query(
fields=['metadata.name'] + [
'status.nodes.' + key2id[k] for k in key])
return workflow.get_step(name=name, phase=phase, id=id, type=type)
steps = workflow.get_step(name=name, phase=phase, id=id, type=type)
assert len(steps) > 0
return steps
except Exception:
logger.warning("Key-ID map not found in the global outputs, "
logger.warning("Key(s) not found in the global outputs, "
"downgrade to full query")
return self.query_step(key=key, name=name, phase=phase, id=id,
type=type)
Expand Down

0 comments on commit 1026c4b

Please sign in to comment.