Skip to content

Commit

Permalink
Merge pull request #798 from deepmodeling/zjgemi
Browse files Browse the repository at this point in the history
fix: handle reused artifacts for slices of slices
  • Loading branch information
zjgemi authored Apr 30, 2024
2 parents 38e337a + 107e6db commit 80b8809
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions src/dflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,23 +393,25 @@ def handle_reused_step(self, step, global_parameters, global_artifacts):
keys = []
if group_key:
keys.append(group_key)
for k, v in step.get("inputs", {}).get(
"parameters", {}).items():
for k, v in sorted(step.get("inputs", {}).get(
"parameters", {}).items()):
if k.startswith("dflow_artifact_key_"):
keys.append(v)
keys.append(v.value[v.value.find("/")+1:])
art_key = get_key(art, raise_error=False)
need_handle = False
for k in keys:
group_keys = []
for i, k in enumerate(keys):
if (art_key and art_key.endswith("%s/%s" % (k, name)))\
or (getattr(art, "modified", {}).get(
"old_key", "").endswith("%s/%s" % (k, name))):
need_handle = True
if need_handle:
if config["overwrite_reused_artifact"]:
self.handle_reused_artifact(step, name, art)
else:
group_keys = keys[:i+1]
if config["overwrite_reused_artifact"]:
for group_key in group_keys:
self.handle_reused_artifact(
step, name, art, group_key)
else:
if len(group_keys) > 0:
self.handle_reused_artifact_with_copy(
step, name, art)
step, name, art, group_keys[-1])
if hasattr(art, "globalName"):
global_art = art.recover()
global_art["name"] = art.globalName
Expand All @@ -424,7 +426,7 @@ def handle_reused_step(self, step, global_parameters, global_artifacts):
"lastHitTimestamp": step.finishedAt
}

def handle_reused_artifact(self, step, name, art):
def handle_reused_artifact(self, step, name, art, group_key):
art_key = get_key(art, raise_error=False)
if hasattr(art, "modified"):
key = art.modified["old_key"]
Expand All @@ -436,10 +438,9 @@ def handle_reused_artifact(self, step, name, art):
art_key_prefix = art_key
if art_key_prefix.startswith(s3_config["prefix"]):
art_key_prefix = art_key_prefix[len(s3_config["prefix"]):]
if art_key_prefix.endswith(name):
art_key_prefix = art_key_prefix[:-len(name)-1]
art_key_prefix = art_key_prefix[:art_key_prefix.find("/")+1]
art_key_prefix += group_key

group_key = step.inputs.parameters["dflow_group_key"].value
if "%s-init-artifact" % group_key in self.reused_keys:
return
memoize_key = "%s-%s-init-artifact" % (self.id, group_key)
Expand Down Expand Up @@ -471,12 +472,11 @@ def handle_reused_artifact(self, step, name, art):
init_art["oss"] = {"key": s3_config["repo_prefix"] + art_key}
init_step["outputs"]["artifacts"].append(init_art)

def handle_reused_artifact_with_copy(self, step, name, art):
def handle_reused_artifact_with_copy(self, step, name, art, group_key):
old_key = get_key(art, raise_error=False)
if old_key and old_key not in self.copied_keys:
key = "%s%s/%s/%s" % (
s3_config["prefix"], self.id, step.inputs.parameters[
"dflow_group_key"].value, name)
s3_config["prefix"], self.id, group_key, name)
logger.debug("copying artifact: %s -> %s" % (old_key, key))
copy_s3(old_key, key)
set_key(art, key)
Expand Down

0 comments on commit 80b8809

Please sign in to comment.