diff --git a/src/dflow/workflow.py b/src/dflow/workflow.py index 5a923113..c0a8849c 100644 --- a/src/dflow/workflow.py +++ b/src/dflow/workflow.py @@ -393,23 +393,25 @@ def handle_reused_step(self, step, global_parameters, global_artifacts): keys = [] if group_key: keys.append(group_key) - for k, v in step.get("inputs", {}).get( - "parameters", {}).items(): + for k, v in sorted(step.get("inputs", {}).get( + "parameters", {}).items()): if k.startswith("dflow_artifact_key_"): - keys.append(v) + keys.append(v.value[v.value.find("/")+1:]) art_key = get_key(art, raise_error=False) - need_handle = False - for k in keys: + group_keys = [] + for i, k in enumerate(keys): if (art_key and art_key.endswith("%s/%s" % (k, name)))\ or (getattr(art, "modified", {}).get( "old_key", "").endswith("%s/%s" % (k, name))): - need_handle = True - if need_handle: - if config["overwrite_reused_artifact"]: - self.handle_reused_artifact(step, name, art) - else: + group_keys = keys[:i+1] + if config["overwrite_reused_artifact"]: + for group_key in group_keys: + self.handle_reused_artifact( + step, name, art, group_key) + else: + if len(group_keys) > 0: self.handle_reused_artifact_with_copy( - step, name, art) + step, name, art, group_keys[-1]) if hasattr(art, "globalName"): global_art = art.recover() global_art["name"] = art.globalName @@ -424,7 +426,7 @@ def handle_reused_step(self, step, global_parameters, global_artifacts): "lastHitTimestamp": step.finishedAt } - def handle_reused_artifact(self, step, name, art): + def handle_reused_artifact(self, step, name, art, group_key): art_key = get_key(art, raise_error=False) if hasattr(art, "modified"): key = art.modified["old_key"] @@ -436,10 +438,9 @@ def handle_reused_artifact(self, step, name, art): art_key_prefix = art_key if art_key_prefix.startswith(s3_config["prefix"]): art_key_prefix = art_key_prefix[len(s3_config["prefix"]):] - if art_key_prefix.endswith(name): - art_key_prefix = art_key_prefix[:-len(name)-1] + art_key_prefix = art_key_prefix[:art_key_prefix.find("/")+1] + art_key_prefix += group_key - group_key = step.inputs.parameters["dflow_group_key"].value if "%s-init-artifact" % group_key in self.reused_keys: return memoize_key = "%s-%s-init-artifact" % (self.id, group_key) @@ -471,12 +472,11 @@ def handle_reused_artifact(self, step, name, art): init_art["oss"] = {"key": s3_config["repo_prefix"] + art_key} init_step["outputs"]["artifacts"].append(init_art) - def handle_reused_artifact_with_copy(self, step, name, art): + def handle_reused_artifact_with_copy(self, step, name, art, group_key): old_key = get_key(art, raise_error=False) if old_key and old_key not in self.copied_keys: key = "%s%s/%s/%s" % ( - s3_config["prefix"], self.id, step.inputs.parameters[ - "dflow_group_key"].value, name) + s3_config["prefix"], self.id, group_key, name) logger.debug("copying artifact: %s -> %s" % (old_key, key)) copy_s3(old_key, key) set_key(art, key)